Remove unnecessary tests from MoqtSessionTest. Putting the logic in OutgoingSubgroupStreamTest and SubscriptionPublisherTest is more compact and cleaner conceptually.
Includes a small fix to allow opening several streams at once.
Here is a mapping of the deleted tests and what covers them:
OSST = OutgoingSubgroupStreamTest
SPT = SubscriptionPublisherTest
CreateOutgoingSubgroupStreamAndSend -> CreateStream via
SPT::UpdatePriorityWithActiveStreams
FinDataStreamFromCache -> OSST::OnCanWriteSetsAlarm
SendFragmentedObject -> OSST::SendFragmentedObject
GroupAbandonedNoDeliveryTimeout -> SPT::OnGroupAbandonedTooFarBehind
GroupAbandondedDeliveryTimeout is the same as GroupAbandonedNoDeliveryTimeout. This is an identical test!
GroupAbandoned -> SPT::OnGroupAbandonedWithStreams
LateFinInDataStream -> OSST::Fin
SeparateFinForFutureObject -> OSST::FinForFutureObject
PublisherAbandonsSubgrpup -> SPT::OnSubgroupAbandonedWithStream
UnidirectionalStreamCannotBeOpened ->
SPT::OnNewObjectAvailableStreamCreationBlocked,
SPT::OnCanCreateNewUniStreamSuccess
QueuedStreamIsCleared -> SPT::OnCanCreateNewUniStreamSuccess,
SPT::OnCanCreateNewUniStreamPendingCleanup
OutgoingStreamDisappears -> SPT::OnDataStreamDestroyed
SendDatagram -> SPT::OnNewObjectAvailableDatagram
QueuedStreamsOpenedInOrder -> SPT::PendingStreamsInOrder
StreamQueuedForSubscriptionThatDoesntExist ->
SPT::OnCanCreateNewUniStreamPendingCleanup
QueuedStreamPriorityChanged -> MoqtSessionTest::UpdateTrackPriority,
SPT::PendingStreamsInOrder,
SPT::UpdatePriorityWithPendingStreams
DeliveryTimeoutExpiredOnArrival -> OSST::OnCanWriteTimeout
DeliveryTimeoutAfterIntegratedFin -> OSST::OnCanWriteSetsAlarm
DeliveryTimeoutAfterSeparateFin -> OSST::Fin
DeliveryTimeoutAlternateDesign -> AlternateDeliveryTimeout
IncomingRequestUpdateTruncatesSubscription ->
MoqtSessionTest::IncomingRequestUpdateTriggersRequestOk,
MoqtSessionTest::IncomingRequestUpdateTriggersRequestError,
SPT::IncomingUpdateTruncatesSubscription
PiperOrigin-RevId: 918686567
diff --git a/quiche/quic/moqt/moqt_integration_test.cc b/quiche/quic/moqt/moqt_integration_test.cc
index ffa3e3d..d715290 100644
--- a/quiche/quic/moqt/moqt_integration_test.cc
+++ b/quiche/quic/moqt/moqt_integration_test.cc
@@ -763,6 +763,10 @@
std::variant<SubscribeOkData, MoqtRequestErrorInfo> response) {
received_ok = std::holds_alternative<SubscribeOkData>(response);
});
+ bool stream_reset = false;
+ EXPECT_CALL(subscribe_visitor_, OnStreamReset).WillOnce([&]() {
+ stream_reset = true;
+ });
MessageParameters parameters(MoqtFilterType::kLargestObject);
// Set delivery timeout to ~ 1 RTT: any loss is fatal.
parameters.delivery_timeout = quic::QuicTimeDelta::FromMilliseconds(100);
@@ -780,15 +784,13 @@
[&](const FullTrackName&, const PublishedObjectMetadata& metadata,
absl::string_view object,
uint64_t offset) { bytes_received += object.size(); });
- queue->AddObject(Location{0, 0}, 0, data, false);
- queue->AddObject(Location{0, 1}, 0, data, false);
- queue->AddObject(Location{0, 2}, 0, data, false);
- queue->AddObject(Location{0, 3}, 0, data, true);
- success = test_harness_.RunUntilWithDefaultTimeout([&]() {
- return MoqtSessionPeer::SubgroupHasBeenReset(
- MoqtSessionPeer::GetSubscription(server_->session(), 0),
- DataStreamIndex{0, 0});
- });
+ quic::QuicTime now = test_harness_.simulator().GetClock()->Now();
+ queue->AddObject(Location{0, 0}, 0, data, false, now);
+ queue->AddObject(Location{0, 1}, 0, data, false, now);
+ queue->AddObject(Location{0, 2}, 0, data, false, now);
+ queue->AddObject(Location{0, 3}, 0, data, true, now);
+ success =
+ test_harness_.RunUntilWithDefaultTimeout([&]() { return stream_reset; });
EXPECT_TRUE(success);
// Stream was reset before all the bytes arrived.
EXPECT_LT(bytes_received, 4000);
@@ -812,6 +814,10 @@
std::variant<SubscribeOkData, MoqtRequestErrorInfo> response) {
received_ok = std::holds_alternative<SubscribeOkData>(response);
});
+ bool stream_reset = false;
+ EXPECT_CALL(subscribe_visitor_, OnStreamReset).WillOnce([&]() {
+ stream_reset = true;
+ });
MessageParameters parameters(MoqtFilterType::kLargestObject);
// Set delivery timeout to ~ 1 RTT: any loss is fatal.
parameters.delivery_timeout = quic::QuicTimeDelta::FromMilliseconds(100);
@@ -831,13 +837,11 @@
[&](const FullTrackName&, const PublishedObjectMetadata& metadata,
absl::string_view object,
uint64_t offset) { bytes_received += object.size(); });
- queue->AddObject(Location{0, 0}, 0, data, false);
- queue->AddObject(Location{1, 0}, 0, data, false);
- success = test_harness_.RunUntilWithDefaultTimeout([&]() {
- return MoqtSessionPeer::SubgroupHasBeenReset(
- MoqtSessionPeer::GetSubscription(server_->session(), 0),
- DataStreamIndex{0, 0});
- });
+ quic::QuicTime now = test_harness_.simulator().GetClock()->Now();
+ queue->AddObject(Location{0, 0}, 0, data, false, now);
+ queue->AddObject(Location{1, 0}, 0, data, false, now);
+ success =
+ test_harness_.RunUntilWithDefaultTimeout([&]() { return stream_reset; });
EXPECT_TRUE(success);
EXPECT_EQ(bytes_received, 2000);
}
diff --git a/quiche/quic/moqt/moqt_session_test.cc b/quiche/quic/moqt/moqt_session_test.cc
index 66c620e..7802cff 100644
--- a/quiche/quic/moqt/moqt_session_test.cc
+++ b/quiche/quic/moqt/moqt_session_test.cc
@@ -18,7 +18,6 @@
#include "absl/base/casts.h"
#include "absl/memory/memory.h"
#include "absl/status/status.h"
-#include "absl/strings/match.h"
#include "absl/strings/string_view.h"
#include "absl/types/span.h"
#include "quiche/quic/core/quic_data_reader.h"
@@ -124,20 +123,6 @@
return fetch;
}
-// TODO(martinduke): Eliminate MoqtSessionPeer::AddSubscription, which allows
-// this to be removed as well.
-static std::shared_ptr<MockTrackPublisher> SetupPublisher(
- FullTrackName track_name, MoqtForwardingPreference forwarding_preference,
- std::optional<Location> largest_sequence) {
- auto publisher = std::make_shared<MockTrackPublisher>(std::move(track_name));
- ON_CALL(*publisher, largest_location())
- .WillByDefault(Return(largest_sequence));
- ON_CALL(*publisher, extensions())
- .WillByDefault(testing::ReturnRef(kNoExtensions));
- ON_CALL(*publisher, expiration()).WillByDefault(Return(std::nullopt));
- return publisher;
-}
-
std::optional<MoqtMessageType> PeekControlMessageType(absl::string_view data) {
quiche::QuicheDataReader reader(data);
uint64_t varint;
@@ -549,8 +534,8 @@
EXPECT_CALL(mock_stream_,
Writev(ControlMessageOfType(MoqtMessageType::kSubscribeOk), _));
listener->OnSubscribeAccepted();
- EXPECT_NE(MoqtSessionPeer::GetSubscription(&session_, kDefaultPeerRequestId),
- nullptr);
+ EXPECT_TRUE(MoqtSessionPeer::RequestIdIsSubscriptionPublisher(
+ &session_, kDefaultPeerRequestId));
}
TEST_F(MoqtSessionTest, AsynchronousSubscribeReturnsError) {
@@ -567,8 +552,8 @@
Writev(ControlMessageOfType(MoqtMessageType::kRequestError), _));
listener->OnSubscribeRejected(MoqtRequestErrorInfo(
RequestErrorCode::kInternalError, std::nullopt, "Test error"));
- EXPECT_EQ(MoqtSessionPeer::GetSubscription(&session_, kDefaultPeerRequestId),
- nullptr);
+ EXPECT_FALSE(MoqtSessionPeer::RequestIdIsSubscriptionPublisher(
+ &session_, kDefaultPeerRequestId));
}
TEST_F(MoqtSessionTest, SynchronousSubscribeReturnsError) {
@@ -586,8 +571,8 @@
RequestErrorCode::kInternalError, std::nullopt, "Test error"));
});
stream_input->ReceiveMessage(request);
- EXPECT_EQ(MoqtSessionPeer::GetSubscription(&session_, kDefaultPeerRequestId),
- nullptr);
+ EXPECT_FALSE(MoqtSessionPeer::RequestIdIsSubscriptionPublisher(
+ &session_, kDefaultPeerRequestId));
}
TEST_F(MoqtSessionTest, SubscribeForPast) {
@@ -676,7 +661,7 @@
kDefaultPeerRequestId,
};
stream_input->ReceiveMessage(unsubscribe);
- EXPECT_EQ(MoqtSessionPeer::GetSubscription(&session_, 1), nullptr);
+ EXPECT_FALSE(MoqtSessionPeer::RequestIdIsSubscriptionPublisher(&session_, 1));
// Subscribe again, succeeds.
request.request_id = 3;
@@ -1302,816 +1287,21 @@
control_stream->ReceiveMessage(subscribe_ok);
}
-// TODO(martinduke): Most of these test cases no longer need to be in
-// MoqtSessionTest. Find any useful functionality and put it in
-// SubscriptionPublisherTest or OutgoingSubgroupStreamTest.
-
-TEST_F(MoqtSessionTest, CreateOutgoingSubgroupStreamAndSend) {
- FullTrackName ftn("foo", "bar");
- auto track =
- SetupPublisher(ftn, MoqtForwardingPreference::kSubgroup, Location(4, 2));
- std::unique_ptr<MoqtBidiStreamTestWrapper> control_stream =
- MoqtSessionPeer::CreateControlStream(&session_, &mock_stream_);
- MoqtObjectListener* subscription =
- MoqtSessionPeer::AddSubscription(&session_, track, 0, 2, 5, 0);
-
- EXPECT_CALL(mock_session_, CanOpenNextOutgoingUnidirectionalStream())
- .WillOnce(Return(true));
- bool fin = false;
- EXPECT_CALL(mock_stream_, CanWrite()).WillRepeatedly([&] { return !fin; });
- EXPECT_CALL(mock_session_, OpenOutgoingUnidirectionalStream())
- .WillOnce(Return(&mock_stream_));
- std::unique_ptr<webtransport::StreamVisitor> stream_visitor;
- EXPECT_CALL(mock_stream_, SetVisitor(_))
- .WillOnce([&](std::unique_ptr<webtransport::StreamVisitor> visitor) {
- stream_visitor = std::move(visitor);
- });
- EXPECT_CALL(mock_stream_, visitor()).WillOnce([&] {
- return stream_visitor.get();
- });
- EXPECT_CALL(mock_stream_, GetStreamId())
- .WillRepeatedly(Return(kOutgoingUniStreamId));
- EXPECT_CALL(mock_session_, GetStreamById(kOutgoingUniStreamId))
- .WillRepeatedly(Return(&mock_stream_));
-
- // Verify first six message fields are sent correctly
- bool correct_message = false;
- const std::string kExpectedMessage = {0x11, 0x02, 0x05, 0x7f, 0x00, 0x0a};
- EXPECT_CALL(mock_stream_, Writev(_, _))
- .WillOnce([&](absl::Span<quiche::QuicheMemSlice> data,
- const webtransport::StreamWriteOptions& options) {
- correct_message =
- absl::StartsWith(data[0].AsStringView(), kExpectedMessage);
- fin |= options.send_fin();
- return absl::OkStatus();
- });
- EXPECT_CALL(*track, GetCachedObject(5, Optional(0), 0, 0))
- .WillRepeatedly([&] {
- return PublishedObject{
- PublishedObjectMetadata{Location(5, 0), 0, "extensions",
- MoqtObjectStatus::kNormal, 127, 8,
- MoqtSessionPeer::Now(&session_)},
- PayloadFromString("deadbeef"), false};
- });
- EXPECT_CALL(*track, GetCachedObject(5, Optional(0), 1, 0)).WillRepeatedly([] {
- return std::optional<PublishedObject>();
- });
- subscription->OnNewObjectAvailable(Location(5, 0), 0, 127);
- EXPECT_TRUE(correct_message);
- EXPECT_FALSE(fin);
- std::optional<Location> largest_sent =
- MoqtSessionPeer::LargestSentForSubscription(&session_, 0);
- EXPECT_TRUE(largest_sent.has_value() && *largest_sent == Location(5, 0));
-}
-
-TEST_F(MoqtSessionTest, FinDataStreamFromCache) {
- FullTrackName ftn("foo", "bar");
- auto track =
- SetupPublisher(ftn, MoqtForwardingPreference::kSubgroup, Location(4, 2));
- std::unique_ptr<MoqtBidiStreamTestWrapper> control_stream =
- MoqtSessionPeer::CreateControlStream(&session_, &mock_stream_);
- MoqtObjectListener* subscription =
- MoqtSessionPeer::AddSubscription(&session_, track, 0, 2, 5, 0);
-
- EXPECT_CALL(mock_session_, CanOpenNextOutgoingUnidirectionalStream())
- .WillOnce(Return(true));
- bool fin = false;
- EXPECT_CALL(mock_stream_, CanWrite()).WillRepeatedly([&] { return !fin; });
- EXPECT_CALL(mock_session_, OpenOutgoingUnidirectionalStream())
- .WillOnce(Return(&mock_stream_));
- std::unique_ptr<webtransport::StreamVisitor> stream_visitor;
- EXPECT_CALL(mock_stream_, SetVisitor(_))
- .WillOnce([&](std::unique_ptr<webtransport::StreamVisitor> visitor) {
- stream_visitor = std::move(visitor);
- });
- EXPECT_CALL(mock_stream_, visitor()).WillOnce([&] {
- return stream_visitor.get();
- });
- EXPECT_CALL(mock_stream_, GetStreamId())
- .WillRepeatedly(Return(kOutgoingUniStreamId));
- EXPECT_CALL(mock_session_, GetStreamById(kOutgoingUniStreamId))
- .WillRepeatedly(Return(&mock_stream_));
-
- // Verify first four message fields are sent correctly
- bool correct_message = false;
- const std::string kExpectedMessage = {0x11, 0x02, 0x05, 0x7f};
- EXPECT_CALL(mock_stream_, Writev(_, _))
- .WillOnce([&](absl::Span<quiche::QuicheMemSlice> data,
- const webtransport::StreamWriteOptions& options) {
- correct_message =
- absl::StartsWith(data[0].AsStringView(), kExpectedMessage);
- fin = options.send_fin();
- return absl::OkStatus();
- });
- EXPECT_CALL(*track, GetCachedObject(5, Optional(0), 0, 0))
- .WillRepeatedly([&] {
- return PublishedObject{
- PublishedObjectMetadata{Location(5, 0), 0, "",
- MoqtObjectStatus::kNormal, 127, 8,
- MoqtSessionPeer::Now(&session_)},
- PayloadFromString("deadbeef"), true};
- });
- EXPECT_CALL(*track, GetCachedObject(5, Optional(0), 1, 0)).WillRepeatedly([] {
- return std::optional<PublishedObject>();
- });
- subscription->OnNewObjectAvailable(Location(5, 0), 0,
- kDefaultPublisherPriority);
- EXPECT_TRUE(correct_message);
- EXPECT_TRUE(fin);
-}
-
-TEST_F(MoqtSessionTest, SendFragmentedObject) {
- FullTrackName ftn("foo", "bar");
- auto track =
- SetupPublisher(ftn, MoqtForwardingPreference::kSubgroup, Location(4, 2));
- std::unique_ptr<MoqtBidiStreamTestWrapper> control_stream =
- MoqtSessionPeer::CreateControlStream(&session_, &mock_stream_);
- MoqtObjectListener* subscription =
- MoqtSessionPeer::AddSubscription(&session_, track, 0, 2, 5, 0);
-
- // 1st OnNewObjectAvailable happens when no outgoing unidirectional stream is
- // available.
- EXPECT_CALL(mock_session_, CanOpenNextOutgoingUnidirectionalStream())
- .WillOnce(Return(false));
- subscription->OnNewObjectAvailable(Location(5, 0), 0, 128);
-
- // 2nd OnNewObjectAvailable happens when Part 1 ("part1", 5 bytes) and
- // Part 2 ("part2", 5 bytes) are available.
- bool fin = false;
- EXPECT_CALL(mock_session_, CanOpenNextOutgoingUnidirectionalStream())
- .WillOnce(Return(true));
- EXPECT_CALL(mock_stream_, CanWrite()).WillRepeatedly([&] { return !fin; });
- EXPECT_CALL(mock_session_, OpenOutgoingUnidirectionalStream())
- .WillOnce(Return(&mock_stream_));
- std::unique_ptr<webtransport::StreamVisitor> stream_visitor;
- EXPECT_CALL(mock_stream_, SetVisitor(_))
- .WillOnce([&](std::unique_ptr<webtransport::StreamVisitor> visitor) {
- stream_visitor = std::move(visitor);
- });
- EXPECT_CALL(mock_stream_, visitor()).WillRepeatedly([&] {
- return stream_visitor.get();
- });
- EXPECT_CALL(mock_stream_, GetStreamId())
- .WillRepeatedly(Return(kOutgoingUniStreamId));
- EXPECT_CALL(mock_session_, GetStreamById(kOutgoingUniStreamId))
- .WillRepeatedly(Return(&mock_stream_));
- PublishedObjectMetadata metadata = {
- Location(5, 0),
- /*subgroup=*/0,
- /*extensions=*/"", MoqtObjectStatus::kNormal,
- /*priority=*/128,
- /*payload_length=*/15, MoqtSessionPeer::Now(&session_)};
- std::vector<quiche::QuicheMemSlice> payload;
- payload.push_back(quiche::QuicheMemSlice::Copy("part1"));
- payload.push_back(quiche::QuicheMemSlice::Copy("part2"));
- EXPECT_CALL(*track, GetCachedObject(5, Optional(0), 0, 0))
- .WillOnce(Return(PublishedObject{metadata, std::move(payload), false}));
- // Expect Writev for the header and first two parts.
- EXPECT_CALL(mock_stream_, Writev)
- .WillOnce([&](absl::Span<quiche::QuicheMemSlice> data,
- const webtransport::StreamWriteOptions& options) {
- EXPECT_EQ(data.size(), 3); // Header, part1, and part2.
- EXPECT_EQ(data[1].AsStringView(), "part1");
- EXPECT_EQ(data[2].AsStringView(), "part2");
- EXPECT_FALSE(options.send_fin());
- return absl::OkStatus();
- });
- subscription->OnNewObjectAvailable(Location(5, 0), 0, 128);
-
- // 3rd OnNewObjectAvailable happens when Part 3 ("part3", 5 bytes) is
- // available.
- EXPECT_CALL(*track, GetCachedObject(5, Optional(0), 0, 10))
- .WillOnce(
- Return(PublishedObject{metadata, PayloadFromString("part3"), true}));
- EXPECT_CALL(mock_stream_, Writev)
- .WillOnce([&](absl::Span<quiche::QuicheMemSlice> data,
- const webtransport::StreamWriteOptions& options) {
- EXPECT_TRUE(options.send_fin());
- EXPECT_EQ(data.size(), 1); // No header, just part3.
- EXPECT_EQ(data[0].AsStringView(), "part3");
- fin = true;
- return absl::OkStatus();
- });
- subscription->OnNewObjectAvailable(Location(5, 0), 0, 128);
-}
-
-TEST_F(MoqtSessionTest, GroupAbandonedNoDeliveryTimeout) {
- FullTrackName ftn("foo", "bar");
- webtransport::test::MockStream control_stream;
- std::unique_ptr<MoqtBidiStreamTestWrapper> stream_input =
- MoqtSessionPeer::CreateControlStream(&session_, &control_stream);
- auto track =
- SetupPublisher(ftn, MoqtForwardingPreference::kSubgroup, Location(4, 2));
- MoqtObjectListener* subscription =
- MoqtSessionPeer::AddSubscription(&session_, track, 0, 2, 5, 0);
-
- EXPECT_CALL(mock_session_, CanOpenNextOutgoingUnidirectionalStream())
- .WillOnce(Return(true));
- bool fin = false;
- EXPECT_CALL(mock_stream_, CanWrite()).WillRepeatedly([&] { return !fin; });
- EXPECT_CALL(mock_session_, OpenOutgoingUnidirectionalStream())
- .WillOnce(Return(&mock_stream_));
- std::unique_ptr<webtransport::StreamVisitor> stream_visitor;
- EXPECT_CALL(mock_stream_, SetVisitor(_))
- .WillOnce([&](std::unique_ptr<webtransport::StreamVisitor> visitor) {
- stream_visitor = std::move(visitor);
- });
- EXPECT_CALL(mock_stream_, visitor()).WillOnce([&] {
- return stream_visitor.get();
- });
- EXPECT_CALL(mock_stream_, GetStreamId())
- .WillRepeatedly(Return(kOutgoingUniStreamId));
- EXPECT_CALL(mock_session_, GetStreamById(kOutgoingUniStreamId))
- .WillRepeatedly(Return(&mock_stream_));
-
- // Verify first four message fields are sent correctly
- bool correct_message = false;
- const std::string kExpectedMessage = {0x11, 0x02, 0x05, 0x7f};
- EXPECT_CALL(mock_stream_, Writev(_, _))
- .WillOnce([&](absl::Span<quiche::QuicheMemSlice> data,
- const webtransport::StreamWriteOptions& options) {
- correct_message =
- absl::StartsWith(data[0].AsStringView(), kExpectedMessage);
- fin |= options.send_fin();
- return absl::OkStatus();
- });
- EXPECT_CALL(*track, GetCachedObject(5, Optional(0), 0, 0))
- .WillRepeatedly([&] {
- return PublishedObject{
- PublishedObjectMetadata{Location(5, 0), 0, "",
- MoqtObjectStatus::kNormal, 127, 8,
- MoqtSessionPeer::Now(&session_)},
- PayloadFromString("deadbeef"), true};
- });
- EXPECT_CALL(*track, GetCachedObject(5, Optional(0), 1, 0)).WillRepeatedly([] {
- return std::optional<PublishedObject>();
- });
- subscription->OnNewObjectAvailable(Location(5, 0), 0,
- kDefaultPublisherPriority);
- EXPECT_TRUE(correct_message);
- EXPECT_TRUE(fin);
-
- struct MoqtPublishDone expected_publish_done = {
- /*request_id=*/0,
- PublishDoneCode::kTooFarBehind,
- /*stream_count=*/1,
- /*error_reason=*/"",
- };
- EXPECT_CALL(mock_stream_, ResetWithUserCode(kResetCodeCancelled));
- EXPECT_CALL(control_stream,
- Writev(SerializedControlMessage(expected_publish_done), _));
- subscription->OnGroupAbandoned(5);
-}
-
-TEST_F(MoqtSessionTest, GroupAbandonedDeliveryTimeout) {
- FullTrackName ftn("foo", "bar");
- webtransport::test::MockStream control_stream;
- std::unique_ptr<MoqtBidiStreamTestWrapper> stream_input =
- MoqtSessionPeer::CreateControlStream(&session_, &control_stream);
- auto track =
- SetupPublisher(ftn, MoqtForwardingPreference::kSubgroup, Location(4, 2));
- MoqtObjectListener* subscription =
- MoqtSessionPeer::AddSubscription(&session_, track, 0, 2, 5, 0);
-
- EXPECT_CALL(mock_session_, CanOpenNextOutgoingUnidirectionalStream())
- .WillOnce(Return(true));
- bool fin = false;
- EXPECT_CALL(mock_stream_, CanWrite()).WillRepeatedly([&] { return !fin; });
- EXPECT_CALL(mock_session_, OpenOutgoingUnidirectionalStream())
- .WillOnce(Return(&mock_stream_));
- std::unique_ptr<webtransport::StreamVisitor> stream_visitor;
- EXPECT_CALL(mock_stream_, SetVisitor(_))
- .WillOnce([&](std::unique_ptr<webtransport::StreamVisitor> visitor) {
- stream_visitor = std::move(visitor);
- });
- EXPECT_CALL(mock_stream_, visitor()).WillOnce([&] {
- return stream_visitor.get();
- });
- EXPECT_CALL(mock_stream_, GetStreamId())
- .WillRepeatedly(Return(kOutgoingUniStreamId));
- EXPECT_CALL(mock_session_, GetStreamById(kOutgoingUniStreamId))
- .WillRepeatedly(Return(&mock_stream_));
-
- // Verify first four message fields are sent correctly
- bool correct_message = false;
- const std::string kExpectedMessage = {0x11, 0x02, 0x05, 0x7f};
- EXPECT_CALL(mock_stream_, Writev(_, _))
- .WillOnce([&](absl::Span<quiche::QuicheMemSlice> data,
- const webtransport::StreamWriteOptions& options) {
- correct_message =
- absl::StartsWith(data[0].AsStringView(), kExpectedMessage);
- fin |= options.send_fin();
- return absl::OkStatus();
- });
- EXPECT_CALL(*track, GetCachedObject(5, Optional(0), 0, 0))
- .WillRepeatedly([&] {
- return PublishedObject{
- PublishedObjectMetadata{Location(5, 0), 0, "",
- MoqtObjectStatus::kNormal, 127, 8,
- MoqtSessionPeer::Now(&session_)},
- PayloadFromString("deadbeef"), true};
- });
- EXPECT_CALL(*track, GetCachedObject(5, Optional(0), 1, 0)).WillRepeatedly([] {
- return std::optional<PublishedObject>();
- });
- subscription->OnNewObjectAvailable(Location(5, 0), 0,
- kDefaultPublisherPriority);
- EXPECT_TRUE(correct_message);
- EXPECT_TRUE(fin);
-
- struct MoqtPublishDone expected_publish_done = {
- /*request_id=*/0,
- PublishDoneCode::kTooFarBehind,
- /*stream_count=*/1,
- /*error_reason=*/"",
- };
- EXPECT_CALL(mock_stream_, ResetWithUserCode(kResetCodeCancelled));
- EXPECT_CALL(control_stream,
- Writev(SerializedControlMessage(expected_publish_done), _));
- subscription->OnGroupAbandoned(5);
-}
-
-TEST_F(MoqtSessionTest, GroupAbandoned) {
- FullTrackName ftn("foo", "bar");
- auto track =
- SetupPublisher(ftn, MoqtForwardingPreference::kSubgroup, Location(4, 2));
- std::unique_ptr<MoqtBidiStreamTestWrapper> control_stream =
- MoqtSessionPeer::CreateControlStream(&session_, &mock_stream_);
- MoqtObjectListener* subscription =
- MoqtSessionPeer::AddSubscription(&session_, track, 0, 2, 5, 0);
- MoqtSessionPeer::SetDeliveryTimeout(subscription,
- quic::QuicTimeDelta::FromSeconds(1000));
-
- EXPECT_CALL(mock_session_, CanOpenNextOutgoingUnidirectionalStream())
- .WillOnce(Return(true));
- bool fin = false;
- EXPECT_CALL(mock_stream_, CanWrite()).WillRepeatedly([&] { return !fin; });
- EXPECT_CALL(mock_session_, OpenOutgoingUnidirectionalStream())
- .WillOnce(Return(&mock_stream_));
- std::unique_ptr<webtransport::StreamVisitor> stream_visitor;
- EXPECT_CALL(mock_stream_, SetVisitor(_))
- .WillOnce([&](std::unique_ptr<webtransport::StreamVisitor> visitor) {
- stream_visitor = std::move(visitor);
- });
- EXPECT_CALL(mock_stream_, visitor()).WillOnce([&] {
- return stream_visitor.get();
- });
- EXPECT_CALL(mock_stream_, GetStreamId())
- .WillRepeatedly(Return(kOutgoingUniStreamId));
- EXPECT_CALL(mock_session_, GetStreamById(kOutgoingUniStreamId))
- .WillRepeatedly(Return(&mock_stream_));
-
- // Verify first four message fields are sent correctly
- bool correct_message = false;
- const std::string kExpectedMessage = {0x11, 0x02, 0x05, 0x7f};
- EXPECT_CALL(mock_stream_, Writev(_, _))
- .WillOnce([&](absl::Span<quiche::QuicheMemSlice> data,
- const webtransport::StreamWriteOptions& options) {
- correct_message =
- absl::StartsWith(data[0].AsStringView(), kExpectedMessage);
- fin |= options.send_fin();
- return absl::OkStatus();
- });
- EXPECT_CALL(*track, GetCachedObject(5, Optional(0), 0, 0))
- .WillRepeatedly([&] {
- return PublishedObject{
- PublishedObjectMetadata{Location(5, 0), 0, "",
- MoqtObjectStatus::kNormal, 127, 8,
- MoqtSessionPeer::Now(&session_)},
- PayloadFromString("deadbeef"), true};
- });
- EXPECT_CALL(*track, GetCachedObject(5, Optional(0), 1, 0)).WillRepeatedly([] {
- return std::optional<PublishedObject>();
- });
- subscription->OnNewObjectAvailable(Location(5, 0), 0,
- kDefaultPublisherPriority);
- EXPECT_TRUE(correct_message);
- EXPECT_TRUE(fin);
- EXPECT_CALL(mock_stream_, ResetWithUserCode(kResetCodeDeliveryTimeout));
- subscription->OnGroupAbandoned(5);
-}
-
-TEST_F(MoqtSessionTest, LateFinDataStream) {
- FullTrackName ftn("foo", "bar");
- auto track =
- SetupPublisher(ftn, MoqtForwardingPreference::kSubgroup, Location(4, 2));
- std::unique_ptr<MoqtBidiStreamTestWrapper> control_stream =
- MoqtSessionPeer::CreateControlStream(&session_, &mock_stream_);
- MoqtObjectListener* subscription =
- MoqtSessionPeer::AddSubscription(&session_, track, 0, 2, 5, 0);
-
- EXPECT_CALL(mock_session_, CanOpenNextOutgoingUnidirectionalStream())
- .WillOnce(Return(true));
- bool fin = false;
- EXPECT_CALL(mock_stream_, CanWrite()).WillRepeatedly([&] { return !fin; });
- EXPECT_CALL(mock_session_, OpenOutgoingUnidirectionalStream())
- .WillOnce(Return(&mock_stream_));
- std::unique_ptr<webtransport::StreamVisitor> stream_visitor;
- EXPECT_CALL(mock_stream_, SetVisitor(_))
- .WillOnce([&](std::unique_ptr<webtransport::StreamVisitor> visitor) {
- stream_visitor = std::move(visitor);
- });
- EXPECT_CALL(mock_stream_, visitor()).WillRepeatedly([&] {
- return stream_visitor.get();
- });
- EXPECT_CALL(mock_stream_, GetStreamId())
- .WillRepeatedly(Return(kOutgoingUniStreamId));
- EXPECT_CALL(mock_session_, GetStreamById(kOutgoingUniStreamId))
- .WillRepeatedly(Return(&mock_stream_));
-
- // Verify first four message fields are sent correctly
- bool correct_message = false;
- const std::string kExpectedMessage = {0x11, 0x02, 0x05, 0x7f};
- EXPECT_CALL(mock_stream_, Writev)
- .WillOnce([&](absl::Span<quiche::QuicheMemSlice> data,
- const webtransport::StreamWriteOptions& options) {
- correct_message =
- absl::StartsWith(data[0].AsStringView(), kExpectedMessage);
- fin = options.send_fin();
- return absl::OkStatus();
- });
- EXPECT_CALL(*track, GetCachedObject(5, Optional(0), 0, 0))
- .WillRepeatedly([&] {
- return PublishedObject{
- PublishedObjectMetadata{Location(5, 0), 0, "",
- MoqtObjectStatus::kNormal, 127, 8,
- MoqtSessionPeer::Now(&session_)},
- PayloadFromString("deadbeef"), false};
- });
- EXPECT_CALL(*track, GetCachedObject(5, Optional(0), 1, 0)).WillRepeatedly([] {
- return std::optional<PublishedObject>();
- });
- subscription->OnNewObjectAvailable(Location(5, 0), 0,
- kDefaultPublisherPriority);
- EXPECT_TRUE(correct_message);
- EXPECT_FALSE(fin);
- fin = false;
- EXPECT_CALL(mock_stream_, Writev)
- .WillOnce([&](absl::Span<quiche::QuicheMemSlice> data,
- const webtransport::StreamWriteOptions& options) {
- EXPECT_TRUE(data.empty());
- fin = options.send_fin();
- return absl::OkStatus();
- });
- subscription->OnNewFinAvailable(Location(5, 0), 0);
-}
-
-TEST_F(MoqtSessionTest, SeparateFinForFutureObject) {
- FullTrackName ftn("foo", "bar");
- auto track =
- SetupPublisher(ftn, MoqtForwardingPreference::kSubgroup, Location(4, 2));
- std::unique_ptr<MoqtBidiStreamTestWrapper> control_stream =
- MoqtSessionPeer::CreateControlStream(&session_, &mock_stream_);
- MoqtObjectListener* subscription =
- MoqtSessionPeer::AddSubscription(&session_, track, 0, 2, 5, 0);
-
- EXPECT_CALL(mock_session_, CanOpenNextOutgoingUnidirectionalStream())
- .WillOnce(Return(true));
- bool fin = false;
- EXPECT_CALL(mock_stream_, CanWrite()).WillRepeatedly([&] { return !fin; });
- EXPECT_CALL(mock_session_, OpenOutgoingUnidirectionalStream())
- .WillOnce(Return(&mock_stream_));
- std::unique_ptr<webtransport::StreamVisitor> stream_visitor;
- EXPECT_CALL(mock_stream_, SetVisitor(_))
- .WillOnce([&](std::unique_ptr<webtransport::StreamVisitor> visitor) {
- stream_visitor = std::move(visitor);
- });
- EXPECT_CALL(mock_stream_, visitor()).WillRepeatedly([&] {
- return stream_visitor.get();
- });
- EXPECT_CALL(mock_stream_, GetStreamId())
- .WillRepeatedly(Return(kOutgoingUniStreamId));
- EXPECT_CALL(mock_session_, GetStreamById(kOutgoingUniStreamId))
- .WillRepeatedly(Return(&mock_stream_));
-
- // Verify first six message fields are sent correctly
- bool correct_message = false;
- const std::string kExpectedMessage = {0x04, 0x02, 0x05, 0x7f, 0x00, 0x00};
- EXPECT_CALL(mock_stream_, Writev(_, _))
- .WillOnce([&](absl::Span<quiche::QuicheMemSlice> data,
- const webtransport::StreamWriteOptions& options) {
- correct_message =
- absl::StartsWith(data[0].AsStringView(), kExpectedMessage);
- fin = options.send_fin();
- return absl::OkStatus();
- });
- EXPECT_CALL(*track, GetCachedObject(5, Optional(0), 0, 0))
- .WillRepeatedly([&] {
- return PublishedObject{
- PublishedObjectMetadata{Location(5, 0), 0, "",
- MoqtObjectStatus::kNormal, 127, 8,
- MoqtSessionPeer::Now(&session_)},
- PayloadFromString("deadbeef"), false};
- });
- EXPECT_CALL(*track, GetCachedObject(5, Optional(0), 1, 0)).WillRepeatedly([] {
- return std::optional<PublishedObject>();
- });
- subscription->OnNewObjectAvailable(Location(5, 0), 0,
- kDefaultPublisherPriority);
- EXPECT_FALSE(fin);
- // Try to deliver (5,1), but fail.
- EXPECT_CALL(mock_stream_, CanWrite()).WillRepeatedly([&] { return false; });
- EXPECT_CALL(*track, GetCachedObject).Times(0);
- EXPECT_CALL(mock_stream_, Writev).Times(0);
- subscription->OnNewObjectAvailable(Location(5, 1), 0,
- kDefaultPublisherPriority);
- // Notify that FIN arrived, but do nothing with it because (5, 1) isn't sent.
- EXPECT_CALL(mock_stream_, Writev).Times(0);
- subscription->OnNewFinAvailable(Location(5, 1), 0);
-
- // Reopen the window.
- correct_message = false;
- // object id, extensions, payload length, status.
- const std::string kExpectedMessage2 = {0x00, 0x00, 0x00, 0x03};
- EXPECT_CALL(mock_stream_, CanWrite()).WillRepeatedly([&] { return true; });
- EXPECT_CALL(*track, GetCachedObject(5, Optional(0), 1, 0))
- .WillRepeatedly([&] {
- return PublishedObject{
- PublishedObjectMetadata{Location(5, 1), 0, "",
- MoqtObjectStatus::kEndOfGroup, 127, 0,
- MoqtSessionPeer::Now(&session_)},
- PayloadFromString(""), true};
- });
- EXPECT_CALL(*track, GetCachedObject(5, Optional(0), 2, 0)).WillRepeatedly([] {
- return std::optional<PublishedObject>();
- });
- EXPECT_CALL(mock_stream_, Writev(_, _))
- .WillOnce([&](absl::Span<quiche::QuicheMemSlice> data,
- const webtransport::StreamWriteOptions& options) {
- correct_message =
- absl::StartsWith(data[0].AsStringView(), kExpectedMessage2);
- fin = options.send_fin();
- return absl::OkStatus();
- });
- stream_visitor->OnCanWrite();
- EXPECT_TRUE(correct_message);
- EXPECT_TRUE(fin);
-}
-
-TEST_F(MoqtSessionTest, PublisherAbandonsSubgroup) {
- FullTrackName ftn("foo", "bar");
- auto track =
- SetupPublisher(ftn, MoqtForwardingPreference::kSubgroup, Location(4, 2));
- std::unique_ptr<MoqtBidiStreamTestWrapper> control_stream =
- MoqtSessionPeer::CreateControlStream(&session_, &mock_stream_);
- MoqtObjectListener* subscription =
- MoqtSessionPeer::AddSubscription(&session_, track, 0, 2, 5, 0);
-
- // Deliver first object.
- EXPECT_CALL(mock_session_, CanOpenNextOutgoingUnidirectionalStream())
- .WillOnce(Return(true));
- bool fin = false;
- EXPECT_CALL(mock_stream_, CanWrite()).WillRepeatedly([&] { return !fin; });
- EXPECT_CALL(mock_session_, OpenOutgoingUnidirectionalStream())
- .WillOnce(Return(&mock_stream_));
- std::unique_ptr<webtransport::StreamVisitor> stream_visitor;
- EXPECT_CALL(mock_stream_, SetVisitor(_))
- .WillOnce([&](std::unique_ptr<webtransport::StreamVisitor> visitor) {
- stream_visitor = std::move(visitor);
- });
- EXPECT_CALL(mock_stream_, visitor()).WillRepeatedly([&] {
- return stream_visitor.get();
- });
- EXPECT_CALL(mock_stream_, GetStreamId())
- .WillRepeatedly(Return(kOutgoingUniStreamId));
- EXPECT_CALL(mock_session_, GetStreamById(kOutgoingUniStreamId))
- .WillRepeatedly(Return(&mock_stream_));
- // Verify first six message fields are sent correctly
- bool correct_message = false;
- const std::string kExpectedMessage = {0x04, 0x02, 0x05, 0x7f, 0x00, 0x00};
- EXPECT_CALL(mock_stream_, Writev(_, _))
- .WillOnce([&](absl::Span<quiche::QuicheMemSlice> data,
- const webtransport::StreamWriteOptions& options) {
- correct_message =
- absl::StartsWith(data[0].AsStringView(), kExpectedMessage);
- fin = options.send_fin();
- return absl::OkStatus();
- });
- EXPECT_CALL(*track, GetCachedObject(5, Optional(0), 0, 0))
- .WillRepeatedly([&] {
- return PublishedObject{
- PublishedObjectMetadata{Location(5, 0), 0, "",
- MoqtObjectStatus::kNormal, 127, 8,
- MoqtSessionPeer::Now(&session_)},
- PayloadFromString("deadbeef"), false};
- });
- EXPECT_CALL(*track, GetCachedObject(5, Optional(0), 1, 0)).WillRepeatedly([] {
- return std::optional<PublishedObject>();
- });
- subscription->OnNewObjectAvailable(Location(5, 0), 0,
- kDefaultPublisherPriority);
-
- // Abandon the subgroup.
- EXPECT_CALL(mock_stream_, ResetWithUserCode(0x1)).Times(1);
- subscription->OnSubgroupAbandoned(5, 0, 0x1);
-}
-
-// TODO: Test operation with multiple streams.
-
-TEST_F(MoqtSessionTest, UnidirectionalStreamCannotBeOpened) {
- FullTrackName ftn("foo", "bar");
- auto track =
- SetupPublisher(ftn, MoqtForwardingPreference::kSubgroup, Location(4, 2));
- std::unique_ptr<MoqtBidiStreamTestWrapper> control_stream =
- MoqtSessionPeer::CreateControlStream(&session_, &mock_stream_);
- MoqtObjectListener* subscription =
- MoqtSessionPeer::AddSubscription(&session_, track, 0, 2, 5, 0);
-
- // Queue the outgoing stream.
- EXPECT_CALL(mock_session_, CanOpenNextOutgoingUnidirectionalStream())
- .WillOnce(Return(false));
- subscription->OnNewObjectAvailable(Location(5, 0), 0,
- kDefaultPublisherPriority);
-
- // Unblock the session, and cause the queued stream to be sent.
- EXPECT_CALL(mock_session_, CanOpenNextOutgoingUnidirectionalStream())
- .WillRepeatedly(Return(true));
- bool fin = false;
- EXPECT_CALL(mock_stream_, CanWrite()).WillRepeatedly([&] { return !fin; });
- EXPECT_CALL(mock_session_, OpenOutgoingUnidirectionalStream())
- .WillOnce(Return(&mock_stream_));
- std::unique_ptr<webtransport::StreamVisitor> stream_visitor;
- EXPECT_CALL(mock_stream_, SetVisitor(_))
- .WillOnce([&](std::unique_ptr<webtransport::StreamVisitor> visitor) {
- stream_visitor = std::move(visitor);
- });
- EXPECT_CALL(mock_stream_, visitor()).WillOnce([&] {
- return stream_visitor.get();
- });
- webtransport::StreamPriority expected_priority{
- kMoqtSendGroupId,
- SendOrderForStream(kDefaultSubscriberPriority, kDefaultPublisherPriority,
- 5, 0, MoqtDeliveryOrder::kAscending)};
- EXPECT_CALL(mock_stream_, SetPriority(expected_priority));
- EXPECT_CALL(mock_stream_, GetStreamId())
- .WillRepeatedly(Return(kOutgoingUniStreamId));
- EXPECT_CALL(mock_session_, GetStreamById(kOutgoingUniStreamId))
- .WillRepeatedly(Return(&mock_stream_));
- EXPECT_CALL(mock_stream_, Writev(_, _)).WillOnce(Return(absl::OkStatus()));
- EXPECT_CALL(*track, GetCachedObject(5, Optional(0), 0, 0)).WillRepeatedly([] {
- return PublishedObject{PublishedObjectMetadata{
- Location(5, 0), 0, "", MoqtObjectStatus::kNormal,
- 128, 8, quic::QuicTime::Zero()},
- PayloadFromString("deadbeef")};
- });
- EXPECT_CALL(*track, GetCachedObject(5, Optional(0), 1, 0)).WillRepeatedly([] {
- return std::optional<PublishedObject>();
- });
- session_.OnCanCreateNewOutgoingUnidirectionalStream();
-}
-
-TEST_F(MoqtSessionTest, QueuedStreamIsCleared) {
- FullTrackName ftn("foo", "bar");
- auto track =
- SetupPublisher(ftn, MoqtForwardingPreference::kSubgroup, Location(4, 2));
- std::unique_ptr<MoqtBidiStreamTestWrapper> control_stream =
- MoqtSessionPeer::CreateControlStream(&session_, &mock_stream_);
- MoqtObjectListener* subscription =
- MoqtSessionPeer::AddSubscription(&session_, track, 0, 2, 5, 0);
-
- // Queue the outgoing stream.
- EXPECT_CALL(mock_session_, CanOpenNextOutgoingUnidirectionalStream())
- .WillRepeatedly(Return(false));
- subscription->OnNewObjectAvailable(Location(5, 0), 0,
- kDefaultPublisherPriority);
- subscription->OnNewObjectAvailable(Location(6, 0), 0,
- kDefaultPublisherPriority);
- subscription->OnGroupAbandoned(5);
-
- // Unblock the session, and cause the queued stream to be sent. There should
- // be only one stream.
- EXPECT_CALL(mock_session_, CanOpenNextOutgoingUnidirectionalStream())
- .WillOnce(Return(true))
- .WillOnce(Return(true));
- bool fin = false;
- EXPECT_CALL(mock_stream_, CanWrite()).WillRepeatedly([&] { return !fin; });
- EXPECT_CALL(mock_session_, OpenOutgoingUnidirectionalStream())
- .WillOnce(Return(&mock_stream_));
- std::unique_ptr<webtransport::StreamVisitor> stream_visitor;
- EXPECT_CALL(mock_stream_, SetVisitor(_))
- .WillOnce([&](std::unique_ptr<webtransport::StreamVisitor> visitor) {
- stream_visitor = std::move(visitor);
- });
- EXPECT_CALL(mock_stream_, visitor()).WillOnce([&] {
- return stream_visitor.get();
- });
- EXPECT_CALL(mock_stream_, GetStreamId())
- .WillRepeatedly(Return(kOutgoingUniStreamId));
- EXPECT_CALL(mock_session_, GetStreamById(kOutgoingUniStreamId))
- .WillRepeatedly(Return(&mock_stream_));
- EXPECT_CALL(mock_stream_, Writev(_, _)).WillOnce(Return(absl::OkStatus()));
- EXPECT_CALL(*track, GetCachedObject(6, Optional(0), 0, 0))
- .WillRepeatedly([&] {
- return PublishedObject{
- PublishedObjectMetadata{Location(6, 0), 0, "",
- MoqtObjectStatus::kNormal, 128, 8,
- MoqtSessionPeer::Now(&session_)},
- PayloadFromString("deadbeef")};
- });
- EXPECT_CALL(*track, GetCachedObject(6, Optional(0), 1, 0)).WillRepeatedly([] {
- return std::optional<PublishedObject>();
- });
- session_.OnCanCreateNewOutgoingUnidirectionalStream();
-}
-
-TEST_F(MoqtSessionTest, OutgoingStreamDisappears) {
- FullTrackName ftn("foo", "bar");
- auto track =
- SetupPublisher(ftn, MoqtForwardingPreference::kSubgroup, Location(4, 2));
- std::unique_ptr<MoqtBidiStreamTestWrapper> control_stream =
- MoqtSessionPeer::CreateControlStream(&session_, &mock_stream_);
- MoqtObjectListener* subscription =
- MoqtSessionPeer::AddSubscription(&session_, track, 0, 2, 5, 0);
-
- // Set up an outgoing stream for a group.
- EXPECT_CALL(mock_session_, CanOpenNextOutgoingUnidirectionalStream())
- .WillOnce(Return(true));
- EXPECT_CALL(mock_stream_, CanWrite()).WillRepeatedly(Return(true));
- EXPECT_CALL(mock_session_, OpenOutgoingUnidirectionalStream())
- .WillOnce(Return(&mock_stream_));
- std::unique_ptr<webtransport::StreamVisitor> stream_visitor;
- EXPECT_CALL(mock_stream_, SetVisitor(_))
- .WillOnce([&](std::unique_ptr<webtransport::StreamVisitor> visitor) {
- stream_visitor = std::move(visitor);
- });
- EXPECT_CALL(mock_stream_, visitor()).WillRepeatedly([&] {
- return stream_visitor.get();
- });
- EXPECT_CALL(mock_stream_, GetStreamId())
- .WillRepeatedly(Return(kOutgoingUniStreamId));
- EXPECT_CALL(mock_session_, GetStreamById(kOutgoingUniStreamId))
- .WillRepeatedly(Return(&mock_stream_));
-
- EXPECT_CALL(mock_stream_, Writev(_, _)).WillOnce(Return(absl::OkStatus()));
- EXPECT_CALL(*track, GetCachedObject(5, Optional(0), 0, 0)).WillRepeatedly([] {
- return PublishedObject{
- PublishedObjectMetadata{Location(5, 0), 0, "",
- MoqtObjectStatus::kNormal, 128, 8},
- PayloadFromString("deadbeef")};
- });
- EXPECT_CALL(*track, GetCachedObject(5, Optional(0), 1, 0)).WillOnce([] {
- return std::optional<PublishedObject>();
- });
- subscription->OnNewObjectAvailable(Location(5, 0), 0,
- kDefaultPublisherPriority);
- // Now that the stream exists and is recorded within subscription, make it
- // disappear by returning nullptr.
- EXPECT_CALL(mock_session_, GetStreamById(kOutgoingUniStreamId))
- .WillRepeatedly(Return(nullptr));
- EXPECT_CALL(*track, GetCachedObject(5, Optional(0), 1, 0)).Times(0);
- subscription->OnNewObjectAvailable(Location(5, 1), 0,
- kDefaultPublisherPriority);
-}
-
TEST_F(MoqtSessionTest, ReceiveUnsubscribe) {
- FullTrackName ftn("foo", "bar");
- auto track =
- SetupPublisher(ftn, MoqtForwardingPreference::kSubgroup, Location(4, 2));
- std::unique_ptr<MoqtBidiStreamTestWrapper> stream_input =
- MoqtSessionPeer::CreateControlStream(&session_, &mock_stream_);
- MoqtSessionPeer::AddSubscription(&session_, track, 0, 1, 3, 4);
- MoqtUnsubscribe unsubscribe = {
- /*request_id=*/0,
- };
- stream_input->ReceiveMessage(unsubscribe);
- EXPECT_EQ(MoqtSessionPeer::GetSubscription(&session_, 0), nullptr);
-}
-
-TEST_F(MoqtSessionTest, SendDatagram) {
- FullTrackName ftn("foo", "bar");
- std::shared_ptr<MockTrackPublisher> track_publisher =
- SetupPublisher(ftn, MoqtForwardingPreference::kDatagram, Location{4, 0});
+ MockTrackPublisher* track = CreateTrackPublisher();
+ MoqtSubscribe request = DefaultSubscribe();
+ const MoqtPriority kLocalDefaultPriority = 0x20;
std::unique_ptr<MoqtBidiStreamTestWrapper> control_stream =
MoqtSessionPeer::CreateControlStream(&session_, &mock_stream_);
- MoqtObjectListener* listener =
- MoqtSessionPeer::AddSubscription(&session_, track_publisher, 0, 2, 5, 0);
-
- // Publish in window.
- bool correct_message = false;
- uint8_t kExpectedMessage[] = {
- 0x05, 0x02, 0x05, 0x20, 0x03, 0x65, 0x78, 0x74,
- 0x64, 0x65, 0x61, 0x64, 0x62, 0x65, 0x65, 0x66, // "deadbeef"
- };
- EXPECT_CALL(mock_session_, SendOrQueueDatagram)
- .WillOnce([&](absl::string_view datagram) {
- if (datagram.size() == sizeof(kExpectedMessage)) {
- correct_message = (0 == memcmp(datagram.data(), kExpectedMessage,
- sizeof(kExpectedMessage)));
- }
- return webtransport::DatagramStatus(
- webtransport::DatagramStatusCode::kSuccess, "");
- });
- EXPECT_CALL(*track_publisher,
- GetCachedObject(5, std::optional<uint64_t>(), 0, 0))
- .WillRepeatedly([&] {
- return PublishedObject{
- PublishedObjectMetadata{Location{5, 0}, std::nullopt, "ext",
- MoqtObjectStatus::kNormal, 32, 8,
- MoqtSessionPeer::Now(&session_)},
- PayloadFromString("deadbeef")};
- });
- listener->OnNewObjectAvailable(Location(5, 0), std::nullopt, 32);
- EXPECT_TRUE(correct_message);
+ TrackExtensions extensions(std::nullopt, std::nullopt, kLocalDefaultPriority,
+ std::nullopt, std::nullopt, std::nullopt);
+ EXPECT_CALL(*track, extensions)
+ .WillRepeatedly(testing::ReturnRef(extensions));
+ MoqtObjectListener* listener = ReceiveSubscribeSynchronousOk(
+ track, request, control_stream.get(), /*track_alias=*/0, extensions);
+ MoqtUnsubscribe unsubscribe = {/*request_id=*/1};
+ EXPECT_CALL(*track, RemoveObjectListener(listener));
+ control_stream->ReceiveMessage(unsubscribe);
}
TEST_F(MoqtSessionTest, ReceiveDatagram) {
@@ -2312,233 +1502,39 @@
session_.OnDatagramReceived(absl::string_view(datagram, sizeof(datagram)));
}
-TEST_F(MoqtSessionTest, QueuedStreamsOpenedInOrder) {
- FullTrackName ftn("foo", "bar");
- auto track =
- SetupPublisher(ftn, MoqtForwardingPreference::kSubgroup, Location(0, 0));
- std::unique_ptr<MoqtBidiStreamTestWrapper> control_stream =
- MoqtSessionPeer::CreateControlStream(&session_, &mock_stream_);
- MoqtObjectListener* subscription =
- MoqtSessionPeer::AddSubscription(&session_, track, 0, 14, 0, 0);
- EXPECT_CALL(mock_session_, CanOpenNextOutgoingUnidirectionalStream())
- .WillOnce(Return(false))
- .WillOnce(Return(false))
- .WillOnce(Return(false));
- subscription->OnNewObjectAvailable(Location(1, 0), 0,
- kDefaultPublisherPriority);
- subscription->OnNewObjectAvailable(Location(0, 0), 0,
- kDefaultPublisherPriority);
- subscription->OnNewObjectAvailable(Location(2, 0), 0,
- kDefaultPublisherPriority);
- // These should be opened in the sequence (0, 0), (1, 0), (2, 0).
- EXPECT_CALL(mock_session_, CanOpenNextOutgoingUnidirectionalStream())
- .WillRepeatedly(Return(true));
- webtransport::test::MockStream mock_stream0, mock_stream1, mock_stream2;
- EXPECT_CALL(mock_session_, OpenOutgoingUnidirectionalStream())
- .WillOnce(Return(&mock_stream0))
- .WillOnce(Return(&mock_stream1))
- .WillOnce(Return(&mock_stream2));
- std::unique_ptr<webtransport::StreamVisitor> stream_visitor[3];
- EXPECT_CALL(mock_stream0, SetVisitor(_))
- .WillOnce([&](std::unique_ptr<webtransport::StreamVisitor> visitor) {
- stream_visitor[0] = std::move(visitor);
- });
- EXPECT_CALL(mock_stream1, SetVisitor(_))
- .WillOnce([&](std::unique_ptr<webtransport::StreamVisitor> visitor) {
- stream_visitor[1] = std::move(visitor);
- });
- EXPECT_CALL(mock_stream2, SetVisitor(_))
- .WillOnce([&](std::unique_ptr<webtransport::StreamVisitor> visitor) {
- stream_visitor[2] = std::move(visitor);
- });
- EXPECT_CALL(mock_stream0, GetStreamId()).WillRepeatedly(Return(0));
- EXPECT_CALL(mock_stream1, GetStreamId()).WillRepeatedly(Return(1));
- EXPECT_CALL(mock_stream2, GetStreamId()).WillRepeatedly(Return(2));
- EXPECT_CALL(mock_stream0, visitor()).WillOnce([&]() {
- return stream_visitor[0].get();
- });
- EXPECT_CALL(mock_stream1, visitor()).WillOnce([&]() {
- return stream_visitor[1].get();
- });
- EXPECT_CALL(mock_stream2, visitor()).WillOnce([&]() {
- return stream_visitor[2].get();
- });
- EXPECT_CALL(*track, GetCachedObject(0, Optional(0), 0, 0))
- .WillOnce(Return(PublishedObject{
- PublishedObjectMetadata{Location(0, 0), 0, "",
- MoqtObjectStatus::kNormal, 128, 6,
- MoqtSessionPeer::Now(&session_)},
- PayloadFromString("foobar")}));
- EXPECT_CALL(*track, GetCachedObject(0, Optional(0), 1, 0))
- .WillOnce(Return(std::nullopt));
- EXPECT_CALL(*track, GetCachedObject(1, Optional(0), 0, 0))
- .WillOnce(Return(PublishedObject{
- PublishedObjectMetadata{Location(1, 0), 0, "",
- MoqtObjectStatus::kNormal, 127, 8,
- MoqtSessionPeer::Now(&session_)},
- PayloadFromString("deadbeef")}));
- EXPECT_CALL(*track, GetCachedObject(1, Optional(0), 1, 0))
- .WillOnce(Return(std::nullopt));
- EXPECT_CALL(*track, GetCachedObject(2, Optional(0), 0, 0))
- .WillOnce(Return(PublishedObject{
- PublishedObjectMetadata{Location(2, 0), 0, "",
- MoqtObjectStatus::kNormal, 127, 8,
- MoqtSessionPeer::Now(&session_)},
- PayloadFromString("deadbeef")}));
- EXPECT_CALL(*track, GetCachedObject(2, Optional(0), 1, 0))
- .WillOnce(Return(std::nullopt));
- EXPECT_CALL(mock_stream0, CanWrite()).WillRepeatedly(Return(true));
- EXPECT_CALL(mock_stream1, CanWrite()).WillRepeatedly(Return(true));
- EXPECT_CALL(mock_stream2, CanWrite()).WillRepeatedly(Return(true));
- EXPECT_CALL(mock_stream0, Writev(_, _))
- .WillOnce([&](absl::Span<quiche::QuicheMemSlice> data,
- const webtransport::StreamWriteOptions& options) {
- // The Group ID is the 3rd byte of the stream.
- EXPECT_EQ(static_cast<const uint8_t>(data[0].AsStringView()[2]), 0);
- return absl::OkStatus();
- });
- EXPECT_CALL(mock_stream1, Writev(_, _))
- .WillOnce([&](absl::Span<quiche::QuicheMemSlice> data,
- const webtransport::StreamWriteOptions& options) {
- // The Group ID is the 3rd byte of the stream.
- EXPECT_EQ(static_cast<const uint8_t>(data[0].AsStringView()[2]), 1);
- return absl::OkStatus();
- });
- EXPECT_CALL(mock_stream2, Writev(_, _))
- .WillOnce([&](absl::Span<quiche::QuicheMemSlice> data,
- const webtransport::StreamWriteOptions& options) {
- // The Group ID is the 3rd byte of the stream.
- EXPECT_EQ(static_cast<const uint8_t>(data[0].AsStringView()[2]), 2);
- return absl::OkStatus();
- });
- session_.OnCanCreateNewOutgoingUnidirectionalStream();
-}
-
-TEST_F(MoqtSessionTest, StreamQueuedForSubscriptionThatDoesntExist) {
- FullTrackName ftn("foo", "bar");
- auto track =
- SetupPublisher(ftn, MoqtForwardingPreference::kSubgroup, Location(0, 0));
- std::unique_ptr<MoqtBidiStreamTestWrapper> control_stream =
- MoqtSessionPeer::CreateControlStream(&session_, &mock_stream_);
- MoqtObjectListener* subscription =
- MoqtSessionPeer::AddSubscription(&session_, track, 0, 14, 0, 0);
- EXPECT_CALL(mock_session_, CanOpenNextOutgoingUnidirectionalStream())
- .WillOnce(Return(false));
- subscription->OnNewObjectAvailable(Location(0, 0), 0,
- kDefaultPublisherPriority);
-
- // Delete the subscription, then grant stream credit.
- MoqtSessionPeer::DeleteSubscription(&session_, 0);
- EXPECT_CALL(mock_session_, CanOpenNextOutgoingUnidirectionalStream())
- .WillRepeatedly(Return(true));
- EXPECT_CALL(mock_session_, OpenOutgoingUnidirectionalStream()).Times(0);
- session_.OnCanCreateNewOutgoingUnidirectionalStream();
-}
-
-TEST_F(MoqtSessionTest, QueuedStreamPriorityChanged) {
- FullTrackName ftn1("foo", "bar");
- auto track1 =
- SetupPublisher(ftn1, MoqtForwardingPreference::kSubgroup, Location(0, 0));
- FullTrackName ftn2("dead", "beef");
- auto track2 =
- SetupPublisher(ftn2, MoqtForwardingPreference::kSubgroup, Location(0, 0));
- std::unique_ptr<MoqtBidiStreamTestWrapper> control_stream =
- MoqtSessionPeer::CreateControlStream(&session_, &mock_stream_);
- MoqtObjectListener* subscription0 =
- MoqtSessionPeer::AddSubscription(&session_, track1, 0, 14, 0, 0);
- MoqtObjectListener* subscription1 =
- MoqtSessionPeer::AddSubscription(&session_, track2, 1, 15, 0, 0);
- MoqtSessionPeer::UpdateSubscriberPriority(&session_, 0, 1);
- MoqtSessionPeer::UpdateSubscriberPriority(&session_, 1, 2);
-
- // Two published objects will queue four streams.
- EXPECT_CALL(mock_session_, CanOpenNextOutgoingUnidirectionalStream())
- .WillOnce(Return(false))
- .WillOnce(Return(false))
- .WillOnce(Return(false))
- .WillOnce(Return(false));
- subscription0->OnNewObjectAvailable(Location(0, 0), 0,
- kDefaultPublisherPriority);
- subscription1->OnNewObjectAvailable(Location(0, 0), 0,
- kDefaultPublisherPriority);
- subscription0->OnNewObjectAvailable(Location(1, 0), 0,
- kDefaultPublisherPriority);
- subscription1->OnNewObjectAvailable(Location(1, 0), 0,
- kDefaultPublisherPriority);
-
- // Allow one stream to be opened. It will be group 0, subscription 0.
+TEST_F(MoqtSessionTest, UpdateTrackPriority) {
+ session_.UpdateTrackPriority(0, std::nullopt, MoqtTrackPriority{0x40, 0x82});
+ EXPECT_EQ(MoqtSessionPeer::NextQueuedRequestIdToServer(&session_), 0);
+ // Same track, higher priority.
+ session_.UpdateTrackPriority(0, MoqtTrackPriority{0x40, 0x82},
+ MoqtTrackPriority{0x40, 0x80});
+ EXPECT_EQ(MoqtSessionPeer::NextQueuedRequestIdToServer(&session_), 0);
+ // New track, higher priority.
+ session_.UpdateTrackPriority(2, std::nullopt, MoqtTrackPriority{0x20, 0x82});
+ EXPECT_EQ(MoqtSessionPeer::NextQueuedRequestIdToServer(&session_), 2);
+ // Pop request ID 2 from the queue. The subscription doesn't really exist, so
+ // nothing else happens.
EXPECT_CALL(mock_session_, CanOpenNextOutgoingUnidirectionalStream())
.WillOnce(Return(true))
- .WillOnce(Return(true))
.WillOnce(Return(false));
- webtransport::test::MockStream mock_stream0;
- EXPECT_CALL(mock_session_, OpenOutgoingUnidirectionalStream())
- .WillOnce(Return(&mock_stream0));
- std::unique_ptr<webtransport::StreamVisitor> stream_visitor0;
- EXPECT_CALL(mock_stream0, SetVisitor(_))
- .WillOnce([&](std::unique_ptr<webtransport::StreamVisitor> visitor) {
- stream_visitor0 = std::move(visitor);
- });
- EXPECT_CALL(mock_stream0, GetStreamId()).WillRepeatedly(Return(0));
- EXPECT_CALL(mock_stream0, visitor()).WillOnce([&]() {
- return stream_visitor0.get();
- });
- EXPECT_CALL(*track1, GetCachedObject(0, Optional(0), 0, 0))
- .WillOnce(Return(PublishedObject{
- PublishedObjectMetadata{Location(0, 0), 0, "",
- MoqtObjectStatus::kNormal, 127, 6},
- PayloadFromString("foobar")}));
- EXPECT_CALL(*track1, GetCachedObject(0, Optional(0), 1, 0))
- .WillOnce(Return(std::nullopt));
- EXPECT_CALL(mock_stream0, CanWrite()).WillRepeatedly(Return(true));
- EXPECT_CALL(mock_stream0, Writev)
- .WillOnce([&](absl::Span<quiche::QuicheMemSlice> data,
- const webtransport::StreamWriteOptions& options) {
- // Check track alias is 14.
- EXPECT_EQ(static_cast<const uint8_t>(data[0].AsStringView()[1]), 14);
- // Check Group ID is 0
- EXPECT_EQ(static_cast<const uint8_t>(data[0].AsStringView()[2]), 0);
- return absl::OkStatus();
- });
session_.OnCanCreateNewOutgoingUnidirectionalStream();
-
- // Raise the priority of subscription 1 and allow another stream. It will be
- // group 0, subscription 1.
- MoqtSessionPeer::UpdateSubscriberPriority(&session_, 1, 0);
+ EXPECT_EQ(MoqtSessionPeer::NextQueuedRequestIdToServer(&session_), 0);
+ // There's another stream for request ID 2.
+ session_.UpdateTrackPriority(2, std::nullopt, MoqtTrackPriority{0x20, 0x81});
+ EXPECT_EQ(MoqtSessionPeer::NextQueuedRequestIdToServer(&session_), 2);
+ // The subscriber demotes track 2. Track 0 is first now due to higher
+ // publisher priority.
+ session_.UpdateTrackPriority(2, MoqtTrackPriority{0x20, 0x81},
+ MoqtTrackPriority{0x40, 0x81});
+ EXPECT_EQ(MoqtSessionPeer::NextQueuedRequestIdToServer(&session_), 0);
EXPECT_CALL(mock_session_, CanOpenNextOutgoingUnidirectionalStream())
.WillOnce(Return(true))
- .WillOnce(Return(true))
- .WillRepeatedly(Return(false));
- webtransport::test::MockStream mock_stream1;
- EXPECT_CALL(mock_session_, OpenOutgoingUnidirectionalStream())
- .WillOnce(Return(&mock_stream1));
- std::unique_ptr<webtransport::StreamVisitor> stream_visitor1;
- EXPECT_CALL(mock_stream1, SetVisitor(_))
- .WillOnce([&](std::unique_ptr<webtransport::StreamVisitor> visitor) {
- stream_visitor1 = std::move(visitor);
- });
- EXPECT_CALL(mock_stream1, GetStreamId()).WillRepeatedly(Return(1));
- EXPECT_CALL(mock_stream1, visitor()).WillOnce([&]() {
- return stream_visitor1.get();
- });
- EXPECT_CALL(*track2, GetCachedObject(0, Optional(0), 0, 0))
- .WillOnce(Return(PublishedObject{
- PublishedObjectMetadata{Location(0, 0), 0, "",
- MoqtObjectStatus::kNormal, 127, 8},
- PayloadFromString("deadbeef")}));
- EXPECT_CALL(*track2, GetCachedObject(0, Optional(0), 1, 0))
- .WillOnce(Return(std::nullopt));
- EXPECT_CALL(mock_stream1, CanWrite()).WillRepeatedly(Return(true));
- EXPECT_CALL(mock_stream1, Writev(_, _))
- .WillOnce([&](absl::Span<quiche::QuicheMemSlice> data,
- const webtransport::StreamWriteOptions& options) {
- // Check track alias is 15.
- EXPECT_EQ(static_cast<const uint8_t>(data[0].AsStringView()[1]), 15);
- // Check Group ID is 0
- EXPECT_EQ(static_cast<const uint8_t>(data[0].AsStringView()[2]), 0);
- return absl::OkStatus();
- });
+ .WillOnce(Return(false));
session_.OnCanCreateNewOutgoingUnidirectionalStream();
+ // The subscription will update with the first stream. It's lower priority
+ // than request ID 2.
+ session_.UpdateTrackPriority(0, std::nullopt, MoqtTrackPriority{0x40, 0x82});
+ EXPECT_EQ(MoqtSessionPeer::NextQueuedRequestIdToServer(&session_), 2);
}
// Helper functions to handle the many EXPECT_CALLs for FETCH processing and
@@ -2887,14 +1883,8 @@
SetLargestId(track, Location(4, 10));
ReceiveSubscribeSynchronousOk(track, subscribe, stream_input.get());
- MoqtObjectListener* subscription =
- MoqtSessionPeer::GetSubscription(&session_, subscribe.request_id);
- ASSERT_NE(subscription, nullptr);
- EXPECT_TRUE(
- MoqtSessionPeer::InSubscriptionWindow(subscription, Location(4, 11)));
- EXPECT_FALSE(
- MoqtSessionPeer::InSubscriptionWindow(subscription, Location(4, 10)));
-
+ ASSERT_TRUE(MoqtSessionPeer::RequestIdIsSubscriptionPublisher(
+ &session_, subscribe.request_id));
MoqtFetch fetch = DefaultFetch();
fetch.request_id = 3;
fetch.fetch = JoiningFetchRelative(1, 2);
@@ -2914,14 +1904,8 @@
SetLargestId(track, Location(4, 10));
ReceiveSubscribeSynchronousOk(track, subscribe, stream_input.get());
- MoqtObjectListener* subscription =
- MoqtSessionPeer::GetSubscription(&session_, subscribe.request_id);
- ASSERT_NE(subscription, nullptr);
- EXPECT_TRUE(
- MoqtSessionPeer::InSubscriptionWindow(subscription, Location(4, 11)));
- EXPECT_FALSE(
- MoqtSessionPeer::InSubscriptionWindow(subscription, Location(4, 10)));
-
+ ASSERT_TRUE(MoqtSessionPeer::RequestIdIsSubscriptionPublisher(
+ &session_, subscribe.request_id));
MoqtFetch fetch = DefaultFetch();
fetch.request_id = 3;
fetch.fetch = JoiningFetchAbsolute(1, 2);
@@ -3438,265 +2422,10 @@
MoqtSessionPeer::CreateControlStream(&session_, &mock_stream_);
MockTrackPublisher* track = CreateTrackPublisher();
ReceiveSubscribeSynchronousOk(track, request, control_stream.get());
-
- MoqtObjectListener* subscription =
- MoqtSessionPeer::GetSubscription(&session_, 1);
- ASSERT_NE(subscription, nullptr);
- EXPECT_EQ(MoqtSessionPeer::GetDeliveryTimeout(subscription),
- quic::QuicTimeDelta::FromSeconds(1));
-}
-
-TEST_F(MoqtSessionTest, DeliveryTimeoutExpiredOnArrival) {
- auto track_publisher =
- std::make_shared<MockTrackPublisher>(FullTrackName("foo", "bar"));
- std::unique_ptr<MoqtBidiStreamTestWrapper> control_stream =
- MoqtSessionPeer::CreateControlStream(&session_, &mock_stream_);
- MoqtObjectListener* subscription =
- MoqtSessionPeer::AddSubscription(&session_, track_publisher, 1, 2, 0, 0);
- ASSERT_NE(subscription, nullptr);
- MoqtSessionPeer::SetDeliveryTimeout(subscription,
- quic::QuicTimeDelta::FromSeconds(1));
-
- webtransport::test::MockStream data_mock;
- EXPECT_CALL(mock_session_, CanOpenNextOutgoingUnidirectionalStream())
- .WillOnce(Return(true));
- EXPECT_CALL(mock_session_, OpenOutgoingUnidirectionalStream())
- .WillOnce(Return(&data_mock));
- EXPECT_CALL(data_mock, GetStreamId())
- .WillRepeatedly(Return(kOutgoingUniStreamId));
- std::unique_ptr<webtransport::StreamVisitor> stream_visitor;
- EXPECT_CALL(data_mock, SetVisitor(_))
- .WillOnce([&](std::unique_ptr<webtransport::StreamVisitor> visitor) {
- stream_visitor = std::move(visitor);
- });
- EXPECT_CALL(data_mock, CanWrite()).WillRepeatedly(Return(true));
- EXPECT_CALL(data_mock, visitor()).WillRepeatedly([&]() {
- return stream_visitor.get();
- });
- EXPECT_CALL(*track_publisher, GetCachedObject)
- .WillOnce(Return(
- PublishedObject{PublishedObjectMetadata{
- Location(0, 0),
- 0,
- "",
- MoqtObjectStatus::kObjectDoesNotExist,
- 0,
- 0,
- MoqtSessionPeer::Now(&session_) -
- quic::QuicTimeDelta::FromSeconds(1),
- },
- std::vector<quiche::QuicheMemSlice>(), false}));
- EXPECT_CALL(data_mock, ResetWithUserCode(kResetCodeDeliveryTimeout))
- .WillOnce([&](webtransport::StreamErrorCode /*error*/) {
- stream_visitor.reset();
- });
- // Arrival time is very old; reset immediately.
- ON_CALL(*track_publisher, largest_location)
- .WillByDefault(Return(Location(0, 0)));
- subscription->OnNewObjectAvailable(Location(0, 0), 0,
- kDefaultPublisherPriority);
- // Subsequent objects for that subgroup are ignored.
- EXPECT_CALL(*track_publisher, GetCachedObject).Times(0);
- EXPECT_CALL(mock_session_, GetStreamById(_)).Times(0);
- EXPECT_CALL(mock_session_, CanOpenNextOutgoingUnidirectionalStream())
- .Times(0);
- ON_CALL(*track_publisher, largest_location)
- .WillByDefault(Return(Location(0, 1)));
- subscription->OnNewObjectAvailable(Location(0, 1), 0,
- kDefaultPublisherPriority);
- // Check that reset_subgroups_ is pruned.
- EXPECT_TRUE(MoqtSessionPeer::SubgroupHasBeenReset(subscription,
- DataStreamIndex(0, 0)));
- subscription->OnGroupAbandoned(0);
- EXPECT_FALSE(MoqtSessionPeer::SubgroupHasBeenReset(subscription,
- DataStreamIndex(0, 0)));
-}
-
-TEST_F(MoqtSessionTest, DeliveryTimeoutAfterIntegratedFin) {
- auto track_publisher =
- std::make_shared<MockTrackPublisher>(FullTrackName("foo", "bar"));
- std::unique_ptr<MoqtBidiStreamTestWrapper> control_stream =
- MoqtSessionPeer::CreateControlStream(&session_, &mock_stream_);
- MoqtObjectListener* subscription =
- MoqtSessionPeer::AddSubscription(&session_, track_publisher, 1, 2, 0, 0);
- ASSERT_NE(subscription, nullptr);
- MoqtSessionPeer::SetDeliveryTimeout(subscription,
- quic::QuicTimeDelta::FromSeconds(1));
-
- webtransport::test::MockStream data_mock;
- EXPECT_CALL(mock_session_, CanOpenNextOutgoingUnidirectionalStream())
- .WillOnce(Return(true));
- EXPECT_CALL(mock_session_, OpenOutgoingUnidirectionalStream())
- .WillOnce(Return(&data_mock));
- EXPECT_CALL(data_mock, GetStreamId())
- .WillRepeatedly(Return(kOutgoingUniStreamId));
- std::unique_ptr<webtransport::StreamVisitor> stream_visitor;
- EXPECT_CALL(data_mock, SetVisitor(_))
- .WillOnce([&](std::unique_ptr<webtransport::StreamVisitor> visitor) {
- stream_visitor = std::move(visitor);
- });
- EXPECT_CALL(data_mock, CanWrite()).WillRepeatedly(Return(true));
- EXPECT_CALL(data_mock, visitor()).WillRepeatedly([&]() {
- return stream_visitor.get();
- });
- EXPECT_CALL(*track_publisher, GetCachedObject)
- .WillOnce(Return(PublishedObject{
- PublishedObjectMetadata{Location(0, 0), 0, "",
- MoqtObjectStatus::kObjectDoesNotExist, 0, 0,
- MoqtSessionPeer::Now(&session_)},
- PayloadFromString(""), true}))
- .WillOnce(Return(std::nullopt));
- EXPECT_CALL(data_mock, Writev(_, _)).WillOnce(Return(absl::OkStatus()));
- EXPECT_CALL(data_mock, ResetWithUserCode(kResetCodeDeliveryTimeout)).Times(0);
- ON_CALL(*track_publisher, largest_location)
- .WillByDefault(Return(Location(0, 0)));
- subscription->OnNewObjectAvailable(Location(0, 0), 0,
- kDefaultPublisherPriority);
- auto* delivery_alarm =
- absl::down_cast<quic::test::MockAlarmFactory::TestAlarm*>(
- MoqtSessionPeer::GetAlarm(stream_visitor.get()));
- EXPECT_CALL(data_mock, ResetWithUserCode(kResetCodeDeliveryTimeout))
- .WillOnce([&](webtransport::StreamErrorCode /*error*/) {
- stream_visitor.reset();
- });
- delivery_alarm->Fire();
-}
-
-TEST_F(MoqtSessionTest, DeliveryTimeoutAfterSeparateFin) {
- auto track_publisher =
- std::make_shared<MockTrackPublisher>(FullTrackName("foo", "bar"));
- std::unique_ptr<MoqtBidiStreamTestWrapper> control_stream =
- MoqtSessionPeer::CreateControlStream(&session_, &mock_stream_);
- MoqtObjectListener* subscription =
- MoqtSessionPeer::AddSubscription(&session_, track_publisher, 1, 2, 0, 0);
- ASSERT_NE(subscription, nullptr);
- MoqtSessionPeer::SetDeliveryTimeout(subscription,
- quic::QuicTimeDelta::FromSeconds(1));
-
- webtransport::test::MockStream data_mock;
- EXPECT_CALL(mock_session_, CanOpenNextOutgoingUnidirectionalStream())
- .WillOnce(Return(true));
- EXPECT_CALL(mock_session_, OpenOutgoingUnidirectionalStream())
- .WillOnce(Return(&data_mock));
- EXPECT_CALL(data_mock, GetStreamId())
- .WillRepeatedly(Return(kOutgoingUniStreamId));
- EXPECT_CALL(mock_session_, GetStreamById(kOutgoingUniStreamId))
- .WillRepeatedly(Return(&data_mock));
- std::unique_ptr<webtransport::StreamVisitor> stream_visitor;
- EXPECT_CALL(data_mock, SetVisitor(_))
- .WillOnce([&](std::unique_ptr<webtransport::StreamVisitor> visitor) {
- stream_visitor = std::move(visitor);
- });
- EXPECT_CALL(data_mock, CanWrite()).WillRepeatedly(Return(true));
- EXPECT_CALL(data_mock, visitor()).WillRepeatedly([&]() {
- return stream_visitor.get();
- });
- EXPECT_CALL(*track_publisher, GetCachedObject)
- .WillOnce(Return(PublishedObject{
- PublishedObjectMetadata{Location(0, 0), 0, "",
- MoqtObjectStatus::kObjectDoesNotExist, 0, 0,
- MoqtSessionPeer::Now(&session_)},
- PayloadFromString(""), false}))
- .WillOnce(Return(std::nullopt));
- EXPECT_CALL(data_mock, Writev(_, _)).WillOnce(Return(absl::OkStatus()));
- ON_CALL(*track_publisher, largest_location())
- .WillByDefault(Return(Location(0, 0)));
- subscription->OnNewObjectAvailable(Location(0, 0), 0,
- kDefaultPublisherPriority);
-
- EXPECT_CALL(data_mock, Writev(_, _)).WillOnce(Return(absl::OkStatus()));
- subscription->OnNewFinAvailable(Location(0, 0), 0);
- auto* delivery_alarm =
- absl::down_cast<quic::test::MockAlarmFactory::TestAlarm*>(
- MoqtSessionPeer::GetAlarm(stream_visitor.get()));
- EXPECT_CALL(data_mock, ResetWithUserCode(kResetCodeDeliveryTimeout))
- .WillOnce([&](webtransport::StreamErrorCode /*error*/) {
- stream_visitor.reset();
- });
- delivery_alarm->Fire();
-}
-
-TEST_F(MoqtSessionTest, DeliveryTimeoutAlternateDesign) {
- session_.UseAlternateDeliveryTimeout();
- auto track_publisher =
- std::make_shared<MockTrackPublisher>(FullTrackName("foo", "bar"));
- std::unique_ptr<MoqtBidiStreamTestWrapper> control_stream =
- MoqtSessionPeer::CreateControlStream(&session_, &mock_stream_);
- MoqtObjectListener* subscription =
- MoqtSessionPeer::AddSubscription(&session_, track_publisher, 1, 2, 0, 0);
- ASSERT_NE(subscription, nullptr);
- MoqtSessionPeer::SetDeliveryTimeout(subscription,
- quic::QuicTimeDelta::FromSeconds(1));
-
- webtransport::test::MockStream data_mock1;
- EXPECT_CALL(mock_session_, CanOpenNextOutgoingUnidirectionalStream())
- .WillRepeatedly(Return(true));
- EXPECT_CALL(mock_session_, OpenOutgoingUnidirectionalStream())
- .WillOnce(Return(&data_mock1));
- EXPECT_CALL(data_mock1, GetStreamId())
- .WillRepeatedly(Return(kOutgoingUniStreamId));
- EXPECT_CALL(mock_session_, GetStreamById(kOutgoingUniStreamId))
- .WillRepeatedly(Return(&data_mock1));
- std::unique_ptr<webtransport::StreamVisitor> stream_visitor1;
- EXPECT_CALL(data_mock1, SetVisitor(_))
- .WillOnce([&](std::unique_ptr<webtransport::StreamVisitor> visitor) {
- stream_visitor1 = std::move(visitor);
- });
- EXPECT_CALL(data_mock1, CanWrite()).WillRepeatedly(Return(true));
- EXPECT_CALL(data_mock1, visitor()).WillRepeatedly([&]() {
- return stream_visitor1.get();
- });
- EXPECT_CALL(*track_publisher, GetCachedObject)
- .WillOnce(Return(PublishedObject{
- PublishedObjectMetadata{Location(0, 0), 0, "",
- MoqtObjectStatus::kObjectDoesNotExist, 0, 0,
- MoqtSessionPeer::Now(&session_)},
- PayloadFromString(""), false}))
- .WillOnce(Return(std::nullopt));
- EXPECT_CALL(data_mock1, Writev(_, _)).WillOnce(Return(absl::OkStatus()));
- ON_CALL(*track_publisher, largest_location)
- .WillByDefault(Return(Location(0, 0)));
- subscription->OnNewObjectAvailable(Location(0, 0), 0,
- kDefaultPublisherPriority);
-
- webtransport::test::MockStream data_mock2;
- EXPECT_CALL(mock_session_, OpenOutgoingUnidirectionalStream())
- .WillOnce(Return(&data_mock2));
- EXPECT_CALL(data_mock2, GetStreamId())
- .WillRepeatedly(Return(kOutgoingUniStreamId + 4));
- EXPECT_CALL(mock_session_, GetStreamById(kOutgoingUniStreamId + 4))
- .WillRepeatedly(Return(&data_mock2));
- std::unique_ptr<webtransport::StreamVisitor> stream_visitor2;
- EXPECT_CALL(data_mock2, SetVisitor(_))
- .WillOnce([&](std::unique_ptr<webtransport::StreamVisitor> visitor) {
- stream_visitor2 = std::move(visitor);
- });
- EXPECT_CALL(data_mock2, CanWrite()).WillRepeatedly(Return(true));
- EXPECT_CALL(data_mock2, visitor()).WillRepeatedly([&]() {
- return stream_visitor2.get();
- });
- EXPECT_CALL(*track_publisher, GetCachedObject)
- .WillOnce(Return(PublishedObject{
- PublishedObjectMetadata{Location(1, 0), 0, "",
- MoqtObjectStatus::kObjectDoesNotExist, 0, 0,
- MoqtSessionPeer::Now(&session_)},
- PayloadFromString(""), false}))
- .WillOnce(Return(std::nullopt));
- EXPECT_CALL(data_mock2, Writev(_, _)).WillOnce(Return(absl::OkStatus()));
- ON_CALL(*track_publisher, largest_location)
- .WillByDefault(Return(Location(1, 0)));
- subscription->OnNewObjectAvailable(Location(1, 0), 0,
- kDefaultPublisherPriority);
-
- // Group 1 should start the timer on the Group 0 stream.
- auto* delivery_alarm =
- absl::down_cast<quic::test::MockAlarmFactory::TestAlarm*>(
- MoqtSessionPeer::GetAlarm(stream_visitor1.get()));
- EXPECT_CALL(data_mock1, ResetWithUserCode(kResetCodeDeliveryTimeout))
- .WillOnce([&](webtransport::StreamErrorCode /*error*/) {
- stream_visitor1.reset();
- });
- delivery_alarm->Fire();
+ std::optional<quic::QuicTimeDelta> delivery_timeout =
+ MoqtSessionPeer::GetDeliveryTimeout(&session_, request.request_id);
+ EXPECT_TRUE(delivery_timeout.has_value() &&
+ *delivery_timeout == quic::QuicTimeDelta::FromSeconds(1));
}
TEST_F(MoqtSessionTest, ReceiveGoAwayEnforcement) {
@@ -4402,45 +3131,23 @@
control_stream->ReceiveMessage(MoqtNamespaceDone());
}
-TEST_F(MoqtSessionTest, IncomingRequestUpdateTruncatesSubscription) {
- FullTrackName ftn("foo", "bar");
- std::shared_ptr<MoqtTrackPublisher> publisher =
- SetupPublisher(ftn, MoqtForwardingPreference::kDatagram, Location(8, 0));
- MockTrackPublisher* mock_publisher =
- absl::down_cast<MockTrackPublisher*>(publisher.get());
+TEST_F(MoqtSessionTest, IncomingRequestUpdateTriggersRequestOk) {
+ MoqtSubscribe subscribe = DefaultSubscribe();
+ MockTrackPublisher* track = CreateTrackPublisher();
std::unique_ptr<MoqtBidiStreamTestWrapper> control_stream =
MoqtSessionPeer::CreateControlStream(&session_, &mock_stream_);
- MoqtObjectListener* listener =
- MoqtSessionPeer::AddSubscription(&session_, publisher, /*request_id=*/1,
- /*track_alias=*/2, /*start_group=*/4,
- /*start_object=*/0);
-
- // Send a datagram in window.
- EXPECT_CALL(*mock_publisher,
- GetCachedObject(8, std::optional<uint64_t>(), 0, 0))
- .WillOnce([&] {
- return PublishedObject{
- PublishedObjectMetadata{Location(8, 0), std::nullopt, "extensions",
- MoqtObjectStatus::kNormal, 128, 8,
- MoqtSessionPeer::Now(&session_)},
- PayloadFromString("deadbeef"), false};
- });
- EXPECT_CALL(mock_session_, SendOrQueueDatagram)
- .WillOnce(Return(webtransport::DatagramStatus(
- webtransport::DatagramStatusCode::kSuccess, "")));
-
- listener->OnNewObjectAvailable(Location(8, 0), std::nullopt, 0x80);
-
- // Update the filter to exclude the live edge. The next object is out of
- // window.
- MessageParameters parameters;
- parameters.subscription_filter.emplace(Location(4, 0), 7);
+ ReceiveSubscribeSynchronousOk(track, subscribe, control_stream.get(), 0);
EXPECT_CALL(mock_stream_,
Writev(ControlMessageOfType(MoqtMessageType::kRequestOk), _));
- control_stream->ReceiveMessage(MoqtRequestUpdate{3, 1, parameters});
- EXPECT_CALL(*mock_publisher, GetCachedObject).Times(0);
- EXPECT_CALL(mock_session_, SendOrQueueDatagram).Times(0);
- listener->OnNewObjectAvailable(Location(8, 1), 0, 0x80);
+ control_stream->ReceiveMessage(MoqtRequestUpdate{3, 1, MessageParameters()});
+}
+
+TEST_F(MoqtSessionTest, IncomingRequestUpdateTriggersRequestError) {
+ std::unique_ptr<MoqtBidiStreamTestWrapper> control_stream =
+ MoqtSessionPeer::CreateControlStream(&session_, &mock_stream_);
+ EXPECT_CALL(mock_stream_,
+ Writev(ControlMessageOfType(MoqtMessageType::kRequestError), _));
+ control_stream->ReceiveMessage(MoqtRequestUpdate{3, 1, MessageParameters()});
}
TEST_F(MoqtSessionTest, StopSendingBlocksSubgroup) {
diff --git a/quiche/quic/moqt/moqt_subscription.cc b/quiche/quic/moqt/moqt_subscription.cc
index 29b7e7f..c62d3e0 100644
--- a/quiche/quic/moqt/moqt_subscription.cc
+++ b/quiche/quic/moqt/moqt_subscription.cc
@@ -75,6 +75,8 @@
void SubscriptionPublisher::Update(const MessageParameters& parameters) {
// TODO(martinduke): If there are auth tokens, this probably has to go to the
// application.
+ // TODO(martinduke): If the subscribe window has shrunk, close any streams
+ // that are now outside the window. Also send PUBLISH_DONE if now done.
MoqtPriority old_priority =
parameters_.subscriber_priority.value_or(kDefaultSubscriberPriority);
parameters_.Update(parameters);
@@ -402,27 +404,30 @@
}
void SubscriptionPublisher::OnCanCreateNewUniStream() {
- auto it = pending_streams_.rbegin();
- while (it != pending_streams_.rend() &&
- (it->second.index.group < first_active_group_ ||
- reset_subgroups_.contains(it->second.index))) {
+ while (visitor_->session() != nullptr &&
+ visitor_->session()->CanOpenNextOutgoingUnidirectionalStream()) {
+ auto it = pending_streams_.rbegin();
+ while (it != pending_streams_.rend() &&
+ (it->second.index.group < first_active_group_ ||
+ reset_subgroups_.contains(it->second.index))) {
+ pending_streams_.erase(--(it.base()));
+ it = pending_streams_.rbegin();
+ }
+ if (it == pending_streams_.rend()) {
+ return;
+ }
+ if (OpenDataStream(it->second) == nullptr) {
+ return;
+ }
pending_streams_.erase(--(it.base()));
- it = pending_streams_.rbegin();
- }
- if (it == pending_streams_.rend()) {
- return;
- }
- if (OpenDataStream(it->second) == nullptr) {
- return;
- }
- pending_streams_.erase(--(it.base()));
- if (!pending_streams_.empty()) {
- visitor_->UpdateTrackPriority(
- request_id_, std::nullopt,
- MoqtTrackPriority{
- subscriber_priority(),
- pending_streams_.rbegin()->second.publisher_priority.value_or(
- default_publisher_priority())});
+ if (!pending_streams_.empty()) {
+ visitor_->UpdateTrackPriority(
+ request_id_, std::nullopt,
+ MoqtTrackPriority{
+ subscriber_priority(),
+ pending_streams_.rbegin()->second.publisher_priority.value_or(
+ default_publisher_priority())});
+ }
}
}
diff --git a/quiche/quic/moqt/moqt_subscription.h b/quiche/quic/moqt/moqt_subscription.h
index 37dee22..38a314a 100644
--- a/quiche/quic/moqt/moqt_subscription.h
+++ b/quiche/quic/moqt/moqt_subscription.h
@@ -35,7 +35,7 @@
namespace test {
class SubscriptionPublisherPeer;
-}
+} // namespace test
// This is the part of the send order useful for ranking streams within the
// subscription. It sets the subscriber_priority to kDefaultSubscriberPriority
diff --git a/quiche/quic/moqt/moqt_subscription_test.cc b/quiche/quic/moqt/moqt_subscription_test.cc
index 520e709..89045f3 100644
--- a/quiche/quic/moqt/moqt_subscription_test.cc
+++ b/quiche/quic/moqt/moqt_subscription_test.cc
@@ -11,7 +11,9 @@
#include <string>
#include <utility>
+#include "absl/base/casts.h"
#include "absl/base/nullability.h"
+#include "absl/container/flat_hash_set.h"
#include "absl/status/status.h"
#include "absl/strings/match.h"
#include "absl/strings/string_view.h"
@@ -31,6 +33,7 @@
#include "quiche/quic/moqt/moqt_session_interface.h"
#include "quiche/quic/moqt/moqt_trace_recorder.h"
#include "quiche/quic/moqt/moqt_types.h"
+#include "quiche/quic/moqt/moqt_uni_stream.h"
#include "quiche/quic/moqt/test_tools/moqt_framer_utils.h"
#include "quiche/quic/moqt/test_tools/moqt_mock_visitor.h"
#include "quiche/quic/moqt/test_tools/moqt_session_peer.h"
@@ -43,6 +46,21 @@
namespace moqt::test {
+class SubscriptionPublisherPeer {
+ public:
+ static size_t num_open_streams(SubscriptionPublisher* publisher) {
+ return publisher->stream_map_.GetAllStreams().size();
+ }
+ static std::optional<Location> largest_sent(
+ const SubscriptionPublisher* publisher) {
+ return publisher->largest_sent_;
+ }
+ static const absl::flat_hash_set<DataStreamIndex>& reset_subgroups(
+ const SubscriptionPublisher* publisher) {
+ return publisher->reset_subgroups_;
+ }
+};
+
namespace {
using ::testing::_;
@@ -111,7 +129,7 @@
parameters_.group_order = MoqtDeliveryOrder::kAscending;
EXPECT_CALL(monitoring_interface_, OnObjectAckSupportKnown)
.Times(AtLeast(0));
- ON_CALL(visitor_, session).WillByDefault(Return(&webtrans_));
+ EXPECT_CALL(visitor_, session).WillRepeatedly(Return(&webtrans_));
publisher_ = std::make_unique<SubscriptionPublisher>(
framer_, track_publisher_, &bidi_stream_, kRequestId, kTrackAlias,
parameters_, &visitor_, &monitoring_interface_, &mock_clock_,
@@ -119,6 +137,7 @@
ON_CALL(visitor_, alternate_delivery_timeout).WillByDefault(Return(false));
ON_CALL(webtrans_, GetStreamById(kStreamId))
.WillByDefault(Return(&mock_uni_stream_));
+ ON_CALL(visitor_, alarm_factory).WillByDefault(Return(&alarm_factory_));
}
~SubscriptionPublisherTest() override {
@@ -164,7 +183,7 @@
uni_stream_ = std::move(visitor);
});
EXPECT_CALL(mock_uni_stream_, SetPriority);
- EXPECT_CALL(mock_uni_stream_, visitor()).WillRepeatedly([&]() {
+ ON_CALL(mock_uni_stream_, visitor()).WillByDefault([&]() {
return uni_stream_.get();
});
EXPECT_CALL(mock_uni_stream_, CanWrite()).WillRepeatedly(Return(true));
@@ -192,7 +211,7 @@
EXPECT_CALL(webtrans_, CanOpenNextOutgoingUnidirectionalStream())
.WillOnce(Return(false));
EXPECT_CALL(visitor_,
- UpdateTrackPriority(1, std::optional<MoqtTrackPriority>(),
+ UpdateTrackPriority(1, _,
MoqtTrackPriority{subscriber_priority(),
publisher_priority}));
publisher_->OnNewObjectAvailable(location, subgroup, publisher_priority);
@@ -206,7 +225,7 @@
MoqtControlMessageParser message_parser_{kDefaultMoqtVersion, true};
webtransport::test::MockSession webtrans_;
StrictMock<webtransport::test::MockStream> mock_bidi_stream_;
- StrictMock<webtransport::test::MockStream> mock_uni_stream_;
+ webtransport::test::MockStream mock_uni_stream_;
std::shared_ptr<MockTrackPublisher> track_publisher_;
TestMoqtBidiStream bidi_stream_;
std::unique_ptr<webtransport::StreamVisitor> uni_stream_;
@@ -218,7 +237,7 @@
const TrackExtensions extensions_;
quic::MockClock mock_clock_;
MoqtSessionCallbacks callbacks_;
- quic::test::MockAlarmFactory alarm_factory_;
+ quic::test::TestAlarmFactory alarm_factory_;
int open_streams_ = 0;
};
@@ -395,7 +414,7 @@
publisher_->OnNewFinAvailable(Location(1, 0), 0);
}
-TEST_F(SubscriptionPublisherTest, OnSubgroupAbandoned) {
+TEST_F(SubscriptionPublisherTest, OnSubgroupAbandonedNoEffect) {
// Not in window
MessageParameters params;
params.subscription_filter = SubscriptionFilter(Location(10, 0), 10);
@@ -426,14 +445,16 @@
}
TEST_F(SubscriptionPublisherTest, OnGroupAbandonedWithStreams) {
- // Create a stream
+ // The delivery timeout is not infinite, so it will not send a PUBLISH_DONE
+ // with kTooFarBehind.
CreateStream(Location(1, 0), 0, 128);
EXPECT_CALL(mock_uni_stream_, ResetWithUserCode);
+ EXPECT_CALL(mock_bidi_stream_, Writev).Times(0); // No PUBLISH_DONE.
publisher_->OnGroupAbandoned(1);
}
TEST_F(SubscriptionPublisherTest, OnGroupAbandonedTooFarBehind) {
- // Create a pending stream
+ // Set the delivery timeout to infinite so that TooFarBehind is possible.
parameters_.delivery_timeout = quic::QuicTimeDelta::Infinite();
publisher_->Update(parameters_);
CreateStream(Location(5, 0), 0, 128);
@@ -455,7 +476,8 @@
// Abandon the group.
publisher_->OnGroupAbandoned(1);
// OnCanCreateNewUniStream should clean it up; no attempt to create a stream.
- EXPECT_CALL(webtrans_, CanOpenNextOutgoingUnidirectionalStream()).Times(0);
+ EXPECT_CALL(webtrans_, CanOpenNextOutgoingUnidirectionalStream)
+ .WillOnce(Return(true));
EXPECT_CALL(webtrans_, OpenOutgoingUnidirectionalStream).Times(0);
publisher_->OnCanCreateNewUniStream();
}
@@ -501,7 +523,7 @@
EXPECT_CALL(mock_uni_stream_, GetStreamId())
.WillRepeatedly(Return(kStreamId));
EXPECT_CALL(webtrans_, CanOpenNextOutgoingUnidirectionalStream())
- .WillOnce(Return(true));
+ .WillRepeatedly(Return(true));
EXPECT_CALL(webtrans_, OpenOutgoingUnidirectionalStream)
.WillOnce(Return(&mock_uni_stream_));
EXPECT_CALL(mock_uni_stream_, SetVisitor)
@@ -523,6 +545,94 @@
publisher_->OnCanCreateNewUniStream();
}
+TEST_F(SubscriptionPublisherTest, PendingStreamsInOrder) {
+ CreatePendingStream(Location(1, 0), 0, 128);
+ CreatePendingStream(Location(0, 0), 0, 128);
+ CreatePendingStream(Location(2, 0), 0, 127);
+ // Should be opened in the order (2, 0), (0, 0), (1, 0),
+ // Open stream and send (2, 0).
+ EXPECT_CALL(webtrans_, CanOpenNextOutgoingUnidirectionalStream())
+ .WillOnce(Return(true))
+ .WillOnce(Return(true))
+ .WillOnce(Return(false));
+ EXPECT_CALL(webtrans_, OpenOutgoingUnidirectionalStream)
+ .WillOnce(Return(&mock_uni_stream_));
+ EXPECT_CALL(mock_uni_stream_, GetStreamId).WillRepeatedly(Return(kStreamId));
+ std::unique_ptr<webtransport::StreamVisitor> stream_visitor2;
+ EXPECT_CALL(mock_uni_stream_, SetVisitor)
+ .WillOnce([&](std::unique_ptr<webtransport::StreamVisitor> visitor) {
+ stream_visitor2 = std::move(visitor);
+ });
+ EXPECT_CALL(mock_uni_stream_, visitor()).WillRepeatedly([&]() {
+ return stream_visitor2.get();
+ });
+ EXPECT_CALL(mock_uni_stream_, SetPriority);
+ EXPECT_CALL(*track_publisher_,
+ GetCachedObject(2, std::optional<uint64_t>(0), 0, 0))
+ .WillOnce(Return(DefaultPublishedObject(Location(2, 0), 0, 127)));
+ EXPECT_CALL(*track_publisher_,
+ GetCachedObject(2, std::optional<uint64_t>(0), 1, 0))
+ .WillOnce(Return(std::nullopt));
+ EXPECT_CALL(mock_uni_stream_, CanWrite).WillRepeatedly(Return(true));
+ EXPECT_CALL(mock_uni_stream_, Writev).WillOnce(Return(absl::OkStatus()));
+ EXPECT_CALL(visitor_, UpdateTrackPriority);
+ publisher_->OnCanCreateNewUniStream();
+ // Open (0, 0)
+ EXPECT_CALL(webtrans_, CanOpenNextOutgoingUnidirectionalStream())
+ .WillOnce(Return(true))
+ .WillOnce(Return(true))
+ .WillOnce(Return(false));
+ EXPECT_CALL(webtrans_, OpenOutgoingUnidirectionalStream)
+ .WillOnce(Return(&mock_uni_stream_));
+ EXPECT_CALL(mock_uni_stream_, GetStreamId)
+ .WillRepeatedly(Return(kStreamId + 4));
+ std::unique_ptr<webtransport::StreamVisitor> stream_visitor0;
+ EXPECT_CALL(mock_uni_stream_, SetVisitor)
+ .WillOnce([&](std::unique_ptr<webtransport::StreamVisitor> visitor) {
+ stream_visitor0 = std::move(visitor);
+ });
+ EXPECT_CALL(mock_uni_stream_, visitor()).WillRepeatedly([&]() {
+ return stream_visitor0.get();
+ });
+ EXPECT_CALL(mock_uni_stream_, SetPriority);
+ EXPECT_CALL(*track_publisher_,
+ GetCachedObject(0, std::optional<uint64_t>(0), 0, 0))
+ .WillOnce(Return(DefaultPublishedObject(Location(0, 0), 0, 128)));
+ EXPECT_CALL(*track_publisher_,
+ GetCachedObject(0, std::optional<uint64_t>(0), 1, 0))
+ .WillOnce(Return(std::nullopt));
+ EXPECT_CALL(mock_uni_stream_, CanWrite).WillRepeatedly(Return(true));
+ EXPECT_CALL(mock_uni_stream_, Writev).WillOnce(Return(absl::OkStatus()));
+ EXPECT_CALL(visitor_, UpdateTrackPriority);
+ publisher_->OnCanCreateNewUniStream();
+ // Open (1, 0)
+ EXPECT_CALL(webtrans_, CanOpenNextOutgoingUnidirectionalStream())
+ .WillRepeatedly(Return(true)); // Unlimited credit but only one stream.
+ EXPECT_CALL(webtrans_, OpenOutgoingUnidirectionalStream)
+ .WillOnce(Return(&mock_uni_stream_));
+ EXPECT_CALL(mock_uni_stream_, GetStreamId)
+ .WillRepeatedly(Return(kStreamId + 8));
+ std::unique_ptr<webtransport::StreamVisitor> stream_visitor1;
+ EXPECT_CALL(mock_uni_stream_, SetVisitor)
+ .WillOnce([&](std::unique_ptr<webtransport::StreamVisitor> visitor) {
+ stream_visitor1 = std::move(visitor);
+ });
+ EXPECT_CALL(mock_uni_stream_, visitor()).WillRepeatedly([&]() {
+ return stream_visitor1.get();
+ });
+ EXPECT_CALL(mock_uni_stream_, SetPriority);
+ EXPECT_CALL(*track_publisher_,
+ GetCachedObject(1, std::optional<uint64_t>(0), 0, 0))
+ .WillOnce(Return(DefaultPublishedObject(Location(1, 0), 0, 128)));
+ EXPECT_CALL(*track_publisher_,
+ GetCachedObject(1, std::optional<uint64_t>(0), 1, 0))
+ .WillOnce(Return(std::nullopt));
+ EXPECT_CALL(mock_uni_stream_, CanWrite).WillRepeatedly(Return(true));
+ EXPECT_CALL(mock_uni_stream_, Writev).WillOnce(Return(absl::OkStatus()));
+ EXPECT_CALL(visitor_, UpdateTrackPriority).Times(0);
+ publisher_->OnCanCreateNewUniStream();
+}
+
TEST_F(SubscriptionPublisherTest, OnDataStreamDestroyed) {
CreateStream(Location(1, 0), 0, 128);
DataStreamIndex index(1, 0);
@@ -541,6 +651,51 @@
Location(1, 0));
}
+TEST_F(SubscriptionPublisherTest, AlternateDeliveryTimeout) {
+ EXPECT_CALL(visitor_, alternate_delivery_timeout)
+ .WillRepeatedly(Return(true));
+ CreateStream(Location(0, 0), 0, 128);
+ // Save the visitor before it's overwritten.
+ std::unique_ptr<webtransport::StreamVisitor> uni_stream =
+ std::move(uni_stream_);
+ CreateStream(Location(0, 1), 1, 200);
+ std::unique_ptr<webtransport::StreamVisitor> uni_stream1 =
+ std::move(uni_stream_);
+ // Timers aren't running.
+ EXPECT_EQ(OutgoingSubgroupStreamPeer::GetAlarm(
+ absl::down_cast<OutgoingSubgroupStream*>(uni_stream.get())),
+ nullptr);
+ EXPECT_EQ(OutgoingSubgroupStreamPeer::GetAlarm(
+ absl::down_cast<OutgoingSubgroupStream*>(uni_stream1.get())),
+ nullptr);
+ // Second group starts the timer.
+ EXPECT_CALL(mock_uni_stream_, visitor)
+ .WillOnce(Return(uni_stream.get()))
+ .WillOnce(Return(uni_stream1.get()))
+ .WillRepeatedly([&]() { return uni_stream_.get(); });
+ CreateStream(Location(1, 0), 0, 128);
+ // Group 0 streams now have a timer running.
+ EXPECT_NE(OutgoingSubgroupStreamPeer::GetAlarm(
+ absl::down_cast<OutgoingSubgroupStream*>(uni_stream.get())),
+ nullptr);
+ EXPECT_NE(OutgoingSubgroupStreamPeer::GetAlarm(
+ absl::down_cast<OutgoingSubgroupStream*>(uni_stream1.get())),
+ nullptr);
+ // No timer on group 1.
+ EXPECT_EQ(OutgoingSubgroupStreamPeer::GetAlarm(
+ absl::down_cast<OutgoingSubgroupStream*>(uni_stream_.get())),
+ nullptr);
+}
+
+TEST_F(SubscriptionPublisherTest, IncomingUpdateTruncatesSubscription) {
+ // Track gets to Group 5.
+ CreateStream(Location(5, 0), 0, 128);
+ parameters_.subscription_filter = SubscriptionFilter(Location(0, 0), 4);
+ publisher_->Update(parameters_);
+ EXPECT_CALL(*track_publisher_, GetCachedObject).Times(0);
+ publisher_->OnNewObjectAvailable(Location(5, 1), 0, 128);
+}
+
} // namespace
} // namespace moqt::test
diff --git a/quiche/quic/moqt/moqt_uni_stream.h b/quiche/quic/moqt/moqt_uni_stream.h
index c2921c3..fd0d7d8 100644
--- a/quiche/quic/moqt/moqt_uni_stream.h
+++ b/quiche/quic/moqt/moqt_uni_stream.h
@@ -30,7 +30,7 @@
namespace moqt {
namespace test {
-class MoqtSessionPeer;
+class OutgoingSubgroupStreamPeer;
}
// A base class for locally initiated unidirectional streams, which can serve
@@ -140,7 +140,7 @@
private:
friend class DeliveryTimeoutDelegate;
- friend class test::MoqtSessionPeer;
+ friend class test::OutgoingSubgroupStreamPeer;
// Sends objects on the stream, starting with `next_object_`, until the
// stream becomes write-blocked or closed. Can reset the stream, destroying
diff --git a/quiche/quic/moqt/moqt_uni_stream_test.cc b/quiche/quic/moqt/moqt_uni_stream_test.cc
index 7a9cc11..c021460 100644
--- a/quiche/quic/moqt/moqt_uni_stream_test.cc
+++ b/quiche/quic/moqt/moqt_uni_stream_test.cc
@@ -24,6 +24,7 @@
#include "quiche/quic/moqt/moqt_trace_recorder.h"
#include "quiche/quic/moqt/moqt_types.h"
#include "quiche/quic/moqt/test_tools/moqt_mock_visitor.h"
+#include "quiche/quic/moqt/test_tools/moqt_session_peer.h"
#include "quiche/quic/platform/api/quic_test.h"
#include "quiche/quic/test_tools/mock_clock.h"
#include "quiche/quic/test_tools/quic_test_utils.h"
@@ -93,8 +94,10 @@
EXPECT_CALL(visitor_, OnDataStreamDestroyed(index_));
}
- void CreateStream(uint64_t next_object = 0) {
+ void CreateStream(uint64_t next_object = 0) { CreateStream(0, next_object); }
+ void CreateStream(uint64_t subgroup, uint64_t next_object) {
EXPECT_CALL(mock_stream_, SetPriority);
+ index_ = DataStreamIndex(0, subgroup);
stream_ = std::make_unique<OutgoingSubgroupStream>(
framer_, &mock_stream_, index_, next_object, visitor_.GetWeakPtr(),
track_publisher_, webtransport::StreamPriority(), 0, &trace_recorder_);
@@ -236,6 +239,9 @@
EXPECT_CALL(visitor_, OnObjectSent(Location(0, 0)));
ExpectAlarm();
stream_->OnCanWrite();
+ EXPECT_CALL(mock_stream_, ResetWithUserCode(kResetCodeDeliveryTimeout));
+ EXPECT_CALL(visitor_, OnStreamTimeout(index_));
+ alarm_factory_.FireAlarm(OutgoingSubgroupStreamPeer::GetAlarm(stream_.get()));
}
TEST_F(OutgoingSubgroupStreamTest, Fin) {
@@ -249,8 +255,19 @@
EXPECT_CALL(visitor_, clock()).WillOnce(Return(&mock_clock_));
ExpectAlarm();
stream_->Fin(Location(0, 0));
- // last_object.object >= next_object: does nothing
- stream_->Fin(Location(0, 1));
+ EXPECT_CALL(mock_stream_, ResetWithUserCode(kResetCodeDeliveryTimeout));
+ EXPECT_CALL(visitor_, OnStreamTimeout(index_));
+ alarm_factory_.FireAlarm(OutgoingSubgroupStreamPeer::GetAlarm(stream_.get()));
+}
+
+TEST_F(OutgoingSubgroupStreamTest, FinForFutureObject) {
+ // Delivery is blocked.
+ EXPECT_CALL(mock_stream_, CanWrite).WillOnce(Return(false));
+ stream_->OnCanWrite();
+ // FIN does nothing because last object hasn't been sent. Rely on the cache
+ // to set object.fin_after_this.
+ EXPECT_CALL(mock_stream_, Writev).Times(0);
+ stream_->Fin(Location(0, 0));
}
TEST_F(OutgoingSubgroupStreamTest, UpdatePriority) {
diff --git a/quiche/quic/moqt/test_tools/moqt_mock_visitor.h b/quiche/quic/moqt/test_tools/moqt_mock_visitor.h
index 4b87f5e..d106fb2 100644
--- a/quiche/quic/moqt/test_tools/moqt_mock_visitor.h
+++ b/quiche/quic/moqt/test_tools/moqt_mock_visitor.h
@@ -147,7 +147,8 @@
absl::UnimplementedError("Fetch not implemented"));
}
void AddObject(Location location, uint64_t subgroup,
- absl::string_view payload, bool fin) {
+ absl::string_view payload, bool fin,
+ quic::QuicTime arrival_time = quic::QuicTime::Zero()) {
PublishedObjectMetadata metadata;
metadata.location = location;
metadata.subgroup = subgroup;
@@ -155,6 +156,7 @@
metadata.status = MoqtObjectStatus::kNormal;
metadata.publisher_priority = 128;
metadata.payload_length = payload.length();
+ metadata.arrival_time = arrival_time;
auto it = objects_.find(location);
if (it != objects_.end()) {
it->second.Append(it->second.payload_received(), payload);
diff --git a/quiche/quic/moqt/test_tools/moqt_session_peer.h b/quiche/quic/moqt/test_tools/moqt_session_peer.h
index b3c9e86..a4ba325 100644
--- a/quiche/quic/moqt/test_tools/moqt_session_peer.h
+++ b/quiche/quic/moqt/test_tools/moqt_session_peer.h
@@ -52,25 +52,6 @@
}
};
-// TODO(martinduke): When subscription-specific tests are removed from,
-// MoqtSessionTest, much of this file can be deleted (including
-// SubscriptionPublisherPeer).
-
-class SubscriptionPublisherPeer {
- public:
- static size_t num_open_streams(SubscriptionPublisher* publisher) {
- return publisher->stream_map_.GetAllStreams().size();
- }
- static std::optional<Location> largest_sent(
- const SubscriptionPublisher* publisher) {
- return publisher->largest_sent_;
- }
- static const absl::flat_hash_set<DataStreamIndex>& reset_subgroups(
- const SubscriptionPublisher* publisher) {
- return publisher->reset_subgroups_;
- }
-};
-
// Helper class to interact with MOQT bidi streams in tests.
class MoqtBidiStreamTestWrapper {
public:
@@ -97,6 +78,13 @@
std::unique_ptr<MoqtBidiStreamBase> absl_nonnull stream_;
};
+class OutgoingSubgroupStreamPeer {
+ public:
+ static quic::QuicAlarm* GetAlarm(OutgoingSubgroupStream* stream) {
+ return stream->delivery_timeout_alarm_.get();
+ }
+};
+
class MoqtSessionPeer {
public:
static constexpr webtransport::StreamId kControlStreamId = 4;
@@ -129,6 +117,11 @@
return new_stream;
}
+ static bool RequestIdIsSubscriptionPublisher(MoqtSession* session,
+ uint64_t request_id) {
+ return session->published_subscriptions_.contains(request_id);
+ }
+
// In the test OnSessionReady, the session creates a stream and then passes
// its unique_ptr to the mock webtransport stream. This function casts
// that unique_ptr into a MoqtSession::Stream*, which is a private class of
@@ -158,59 +151,6 @@
std::move(track));
}
- static MoqtObjectListener* AddSubscription(
- MoqtSession* session, std::shared_ptr<MoqtTrackPublisher> publisher,
- uint64_t subscribe_id, uint64_t track_alias, uint64_t start_group,
- uint64_t start_object) {
- MessageParameters parameters;
- parameters.subscription_filter.emplace(Location(start_group, start_object));
- MoqtSubscribe subscribe(subscribe_id, publisher->GetTrackName(),
- parameters);
- subscribe.parameters.set_forward(true);
- subscribe.parameters.subscription_filter.emplace(
- Location(start_group, start_object));
- subscribe.parameters.subscriber_priority = 0x80;
- subscribe.parameters.group_order = MoqtDeliveryOrder::kAscending;
- session->published_subscriptions_.emplace(
- subscribe_id,
- std::make_unique<SubscriptionPublisher>(
- session->framer_, std::move(publisher), session->GetControlStream(),
- subscribe_id, track_alias, subscribe.parameters, session,
- /*monitoring_interface=*/nullptr, session->callbacks_.clock,
- session->trace_recorder_));
- return session->published_subscriptions_[subscribe_id].get();
- }
-
- static bool InSubscriptionWindow(MoqtObjectListener* subscription,
- Location sequence) {
- std::optional<SubscriptionFilter> filter =
- absl::down_cast<SubscriptionPublisher*>(subscription)
- ->parameters()
- .subscription_filter;
- return (!filter.has_value() || filter->InWindow(sequence));
- }
-
- static MoqtObjectListener* GetSubscription(MoqtSession* session,
- uint64_t subscribe_id) {
- auto it = session->published_subscriptions_.find(subscribe_id);
- if (it == session->published_subscriptions_.end()) {
- return nullptr;
- }
- return it->second.get();
- }
-
- static void DeleteSubscription(MoqtSession* session, uint64_t subscribe_id) {
- session->published_subscriptions_.erase(subscribe_id);
- }
-
- static void UpdateSubscriberPriority(MoqtSession* session,
- uint64_t subscribe_id,
- MoqtPriority priority) {
- MessageParameters parameters;
- parameters.subscriber_priority = priority;
- session->published_subscriptions_[subscribe_id]->Update(parameters);
- }
-
static SubscribeRemoteTrack* remote_track(MoqtSession* session,
uint64_t track_alias) {
return session->RemoteTrackByAlias(track_alias);
@@ -237,12 +177,6 @@
session->ValidateRequestId(id);
}
- static std::optional<Location> LargestSentForSubscription(
- MoqtSession* session, uint64_t subscribe_id) {
- return SubscriptionPublisherPeer::largest_sent(
- session->published_subscriptions_[subscribe_id].get());
- }
-
// Adds an upstream fetch and a stream ready to receive data.
static std::unique_ptr<MoqtFetchTask> CreateUpstreamFetch(
MoqtSession* session, webtransport::Stream* stream) {
@@ -285,11 +219,6 @@
return session->callbacks_.clock->ApproximateNow();
}
- static quic::QuicAlarm* GetAlarm(webtransport::StreamVisitor* visitor) {
- return absl::down_cast<OutgoingSubgroupStream*>(visitor)
- ->delivery_timeout_alarm_.get();
- }
-
static quic::QuicAlarm* GetPublishDoneAlarm(
SubscribeRemoteTrack* subscription) {
return subscription->publish_done_alarm_.get();
@@ -299,23 +228,13 @@
return session->goaway_timeout_alarm_.get();
}
- static quic::QuicTimeDelta GetDeliveryTimeout(
- MoqtObjectListener* subscription) {
- return absl::down_cast<SubscriptionPublisher*>(subscription)
- ->delivery_timeout();
- }
- static void SetDeliveryTimeout(MoqtObjectListener* subscription,
- quic::QuicTimeDelta timeout) {
- absl::down_cast<SubscriptionPublisher*>(subscription)
- ->parameters()
- .delivery_timeout = timeout;
- }
-
- static bool SubgroupHasBeenReset(MoqtObjectListener* subscription,
- DataStreamIndex index) {
- return SubscriptionPublisherPeer::reset_subgroups(
- absl::down_cast<SubscriptionPublisher*>(subscription))
- .contains(index);
+ static quic::QuicTimeDelta GetDeliveryTimeout(MoqtSession* session,
+ uint64_t request_id) {
+ auto it = session->published_subscriptions_.find(request_id);
+ if (it == session->published_subscriptions_.end()) {
+ return quic::QuicTimeDelta::Zero();
+ }
+ return it->second->delivery_timeout();
}
static absl::string_view GetImplementationString(MoqtSession* session) {
@@ -329,6 +248,13 @@
static const MoqtSessionParameters& GetParameters(MoqtSession* session) {
return session->parameters_;
}
+
+ static std::optional<uint64_t> NextQueuedRequestIdToServer(
+ MoqtSession* session) {
+ return session->subscriptions_with_queued_streams_.empty()
+ ? std::optional<uint64_t>()
+ : session->subscriptions_with_queued_streams_.begin()->second;
+ }
};
} // namespace moqt::test