blob: 4bcf4a1626fe5af2c8a2f5f79954d9fb59a50974 [file]
// Copyright 2026 The Chromium Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
#include "quiche/quic/moqt/moqt_uni_stream.h"
#include <cstdint>
#include <memory>
#include <optional>
#include <string>
#include <utility>
#include "absl/status/status.h"
#include "absl/strings/string_view.h"
#include "absl/types/span.h"
#include "quiche/quic/core/quic_alarm_factory.h"
#include "quiche/quic/core/quic_time.h"
#include "quiche/quic/moqt/moqt_error.h"
#include "quiche/quic/moqt/moqt_fetch_task.h"
#include "quiche/quic/moqt/moqt_framer.h"
#include "quiche/quic/moqt/moqt_key_value_pair.h"
#include "quiche/quic/moqt/moqt_messages.h"
#include "quiche/quic/moqt/moqt_names.h"
#include "quiche/quic/moqt/moqt_object.h"
#include "quiche/quic/moqt/moqt_trace_recorder.h"
#include "quiche/quic/moqt/moqt_track.h"
#include "quiche/quic/moqt/moqt_types.h"
#include "quiche/quic/moqt/test_tools/moqt_mock_visitor.h"
#include "quiche/quic/moqt/test_tools/moqt_session_peer.h"
#include "quiche/quic/platform/api/quic_test.h"
#include "quiche/quic/test_tools/mock_clock.h"
#include "quiche/quic/test_tools/quic_test_utils.h"
#include "quiche/common/platform/api/quiche_expect_bug.h"
#include "quiche/common/quiche_mem_slice.h"
#include "quiche/common/quiche_weak_ptr.h"
#include "quiche/web_transport/test_tools/in_memory_stream.h"
#include "quiche/web_transport/test_tools/mock_web_transport.h"
#include "quiche/web_transport/web_transport.h"
namespace moqt::test {
namespace {
using ::testing::Optional;
using ::testing::Return;
using ::testing::ReturnRef;
using ::testing::StrictMock;
PublishedObject DefaultObject() {
PublishedObject object;
object.metadata.location = Location(0, 0);
object.metadata.subgroup = 0;
object.metadata.status = MoqtObjectStatus::kNormal;
object.metadata.arrival_time = quic::QuicTime::Zero();
object.metadata.payload_length = 7;
object.payload.push_back(quiche::QuicheMemSlice::Copy("payload"));
object.fin_after_this = false;
return object;
}
class MockSubscriptionPublisherInterface
: public SubscriptionPublisherInterface {
public:
MockSubscriptionPublisherInterface() : weak_ptr_factory_(this) {}
MOCK_METHOD(bool, InWindow, (Location), (override));
MOCK_METHOD(bool, alternate_delivery_timeout, (), (override));
MOCK_METHOD(quic::QuicClock*, clock, (), (override));
MOCK_METHOD(quic::QuicTimeDelta, delivery_timeout, (), (override));
MOCK_METHOD(quic::QuicAlarmFactory*, alarm_factory, (), (override));
MOCK_METHOD(void, OnObjectSent, (Location), (override));
MOCK_METHOD(void, OnStreamTimeout, (DataStreamIndex), (override));
MOCK_METHOD(void, OnSubgroupAbandoned,
(uint64_t, uint64_t, webtransport::StreamErrorCode), (override));
MOCK_METHOD(void, OnDataStreamDestroyed, (DataStreamIndex), (override));
quiche::QuicheWeakPtr<SubscriptionPublisherInterface> GetWeakPtr() {
return weak_ptr_factory_.Create();
}
private:
quiche::QuicheWeakPtrFactory<SubscriptionPublisherInterface>
weak_ptr_factory_;
};
class OutgoingSubgroupStreamTest : public quic::test::QuicTest {
public:
OutgoingSubgroupStreamTest()
: index_(0, 0),
track_publisher_(std::make_shared<StrictMock<MockTrackPublisher>>(
FullTrackName("foo", "bar"))),
trace_recorder_(nullptr) {
EXPECT_CALL(mock_stream_, GetStreamId()).WillRepeatedly(Return(14));
CreateStream();
}
~OutgoingSubgroupStreamTest() override {
EXPECT_CALL(visitor_, OnDataStreamDestroyed(index_));
}
void CreateStream(uint64_t next_object = 0) { CreateStream(0, next_object); }
void CreateStream(uint64_t subgroup, uint64_t next_object) {
EXPECT_CALL(mock_stream_, SetPriority);
index_ = DataStreamIndex(0, subgroup);
stream_ = std::make_unique<OutgoingSubgroupStream>(
framer_, &mock_stream_, index_, next_object, visitor_.GetWeakPtr(),
track_publisher_, webtransport::StreamPriority(), 0, &trace_recorder_);
}
void ExpectFin() {
EXPECT_CALL(mock_stream_, Writev)
.WillOnce([](absl::Span<quiche::QuicheMemSlice> data,
const webtransport::StreamWriteOptions& options) {
EXPECT_TRUE(data.empty());
EXPECT_TRUE(options.send_fin());
return absl::OkStatus();
});
}
void ExpectAlarm() {
EXPECT_CALL(visitor_, alarm_factory()).WillOnce(Return(&alarm_factory_));
}
MoqtFramer framer_{true};
StrictMock<webtransport::test::MockStream> mock_stream_;
DataStreamIndex index_;
std::shared_ptr<StrictMock<MockTrackPublisher>> track_publisher_;
StrictMock<MockSubscriptionPublisherInterface> visitor_;
MoqtTraceRecorder trace_recorder_;
TrackExtensions track_extensions_;
quic::MockClock mock_clock_;
quic::test::MockAlarmFactory alarm_factory_;
std::unique_ptr<OutgoingSubgroupStream> stream_;
};
TEST_F(OutgoingSubgroupStreamTest, OnCanWrite) {
EXPECT_CALL(mock_stream_, CanWrite()).WillOnce(Return(false));
stream_->OnCanWrite();
}
TEST_F(OutgoingSubgroupStreamTest, OnStopSendingReceived) {
EXPECT_CALL(visitor_, OnSubgroupAbandoned(index_.group, index_.subgroup, 1));
stream_->OnStopSendingReceived(1);
}
TEST_F(OutgoingSubgroupStreamTest, DeliveryTimeoutAlarm) {
OutgoingSubgroupStream::DeliveryTimeoutDelegate delegate(stream_.get());
EXPECT_CALL(visitor_, OnStreamTimeout(index_));
EXPECT_CALL(mock_stream_, ResetWithUserCode(kResetCodeDeliveryTimeout));
delegate.OnAlarm();
}
TEST_F(OutgoingSubgroupStreamTest, OnCanWriteCompleteFlow) {
PublishedObject obj0 = DefaultObject();
EXPECT_CALL(mock_stream_, CanWrite())
.WillOnce(Return(true))
.WillOnce(Return(false));
EXPECT_CALL(*track_publisher_, GetCachedObject(0, Optional(0), 0, 0))
.WillOnce(Return(std::move(obj0)));
EXPECT_CALL(visitor_, InWindow(Location(0, 0))).WillOnce(Return(true));
EXPECT_CALL(visitor_, delivery_timeout())
.WillOnce(Return(quic::QuicTimeDelta::FromSeconds(1)));
EXPECT_CALL(visitor_, alternate_delivery_timeout()).WillOnce(Return(false));
EXPECT_CALL(visitor_, clock()).WillOnce(Return(&mock_clock_));
EXPECT_CALL(*track_publisher_, extensions())
.WillRepeatedly(ReturnRef(track_extensions_));
EXPECT_CALL(mock_stream_, Writev).WillOnce(Return(absl::OkStatus()));
EXPECT_CALL(visitor_, OnObjectSent(Location(0, 0)));
stream_->OnCanWrite();
}
TEST_F(OutgoingSubgroupStreamTest, OnCanWriteNotInWindow) {
PublishedObject obj0 = DefaultObject();
EXPECT_CALL(mock_stream_, CanWrite()).WillOnce(Return(true));
EXPECT_CALL(*track_publisher_, GetCachedObject(0, Optional(0), 0, 0))
.WillOnce(Return(std::move(obj0)));
EXPECT_CALL(visitor_, InWindow(Location(0, 0))).WillOnce(Return(false));
ExpectFin();
stream_->OnCanWrite();
}
TEST_F(OutgoingSubgroupStreamTest, OnCanWriteTimeout) {
PublishedObject obj0 = DefaultObject();
EXPECT_CALL(mock_stream_, CanWrite()).WillOnce(Return(true));
EXPECT_CALL(*track_publisher_, GetCachedObject(0, Optional(0), 0, 0))
.WillOnce(Return(std::move(obj0)));
EXPECT_CALL(visitor_, InWindow(Location(0, 0))).WillOnce(Return(true));
EXPECT_CALL(visitor_, delivery_timeout())
.WillOnce(Return(quic::QuicTimeDelta::FromSeconds(1)));
EXPECT_CALL(visitor_, alternate_delivery_timeout()).WillOnce(Return(false));
mock_clock_.AdvanceTime(quic::QuicTimeDelta::FromSeconds(2));
EXPECT_CALL(visitor_, clock()).WillOnce(Return(&mock_clock_));
EXPECT_CALL(visitor_, OnStreamTimeout(index_));
EXPECT_CALL(mock_stream_, ResetWithUserCode(kResetCodeDeliveryTimeout));
stream_->OnCanWrite();
}
TEST_F(OutgoingSubgroupStreamTest, OnCanWriteWriteError) {
PublishedObject obj0 = DefaultObject();
EXPECT_CALL(mock_stream_, CanWrite()).WillOnce(Return(true));
EXPECT_CALL(*track_publisher_, GetCachedObject(0, Optional(0), 0, 0))
.WillOnce(Return(std::move(obj0)));
EXPECT_CALL(visitor_, InWindow(Location(0, 0))).WillOnce(Return(true));
EXPECT_CALL(visitor_, delivery_timeout())
.WillOnce(Return(quic::QuicTimeDelta::FromSeconds(1)));
EXPECT_CALL(visitor_, alternate_delivery_timeout()).WillOnce(Return(false));
EXPECT_CALL(visitor_, clock).WillOnce(Return(&mock_clock_));
EXPECT_CALL(*track_publisher_, extensions())
.WillRepeatedly(ReturnRef(track_extensions_));
EXPECT_CALL(mock_stream_, Writev)
.WillOnce(Return(absl::InternalError("error")));
EXPECT_CALL(mock_stream_, ResetWithUserCode(kResetCodeInternalError));
EXPECT_QUICHE_BUG(
stream_->OnCanWrite(),
"Writing into MoQT stream failed despite CanWrite being true before; "
"status: INTERNAL: error");
}
TEST_F(OutgoingSubgroupStreamTest, OnCanWriteSetsAlarm) {
PublishedObject obj0 = DefaultObject();
obj0.fin_after_this = true;
EXPECT_CALL(mock_stream_, CanWrite())
.WillOnce(Return(true))
.WillOnce(Return(false));
EXPECT_CALL(*track_publisher_, GetCachedObject(0, Optional(0), 0, 0))
.WillOnce(Return(std::move(obj0)));
EXPECT_CALL(visitor_, InWindow(Location(0, 0))).WillOnce(Return(true));
EXPECT_CALL(visitor_, delivery_timeout())
.WillRepeatedly(Return(quic::QuicTimeDelta::FromSeconds(1)));
EXPECT_CALL(visitor_, alternate_delivery_timeout())
.WillRepeatedly(Return(false));
EXPECT_CALL(visitor_, clock).WillOnce(Return(&mock_clock_));
EXPECT_CALL(*track_publisher_, extensions())
.WillRepeatedly(ReturnRef(track_extensions_));
EXPECT_CALL(mock_stream_, Writev)
.WillOnce([&](absl::Span<quiche::QuicheMemSlice> data,
const webtransport::StreamWriteOptions& options) {
EXPECT_TRUE(options.send_fin());
return absl::OkStatus();
});
EXPECT_CALL(visitor_, OnObjectSent(Location(0, 0)));
ExpectAlarm();
stream_->OnCanWrite();
EXPECT_CALL(mock_stream_, ResetWithUserCode(kResetCodeDeliveryTimeout));
EXPECT_CALL(visitor_, OnStreamTimeout(index_));
alarm_factory_.FireAlarm(OutgoingSubgroupStreamPeer::GetAlarm(stream_.get()));
}
TEST_F(OutgoingSubgroupStreamTest, Fin) {
// Replace stream_ with one where next_object_ is 1.
EXPECT_CALL(visitor_, OnDataStreamDestroyed(index_));
CreateStream(1);
// last_object.object < next_object: sends pure FIN
ExpectFin();
EXPECT_CALL(visitor_, delivery_timeout())
.WillOnce(Return(quic::QuicTimeDelta::FromSeconds(1)));
EXPECT_CALL(visitor_, clock()).WillOnce(Return(&mock_clock_));
ExpectAlarm();
stream_->Fin(Location(0, 0));
EXPECT_CALL(mock_stream_, ResetWithUserCode(kResetCodeDeliveryTimeout));
EXPECT_CALL(visitor_, OnStreamTimeout(index_));
alarm_factory_.FireAlarm(OutgoingSubgroupStreamPeer::GetAlarm(stream_.get()));
}
TEST_F(OutgoingSubgroupStreamTest, FinForFutureObject) {
// Delivery is blocked.
EXPECT_CALL(mock_stream_, CanWrite).WillOnce(Return(false));
stream_->OnCanWrite();
// FIN does nothing because last object hasn't been sent. Rely on the cache
// to set object.fin_after_this.
EXPECT_CALL(mock_stream_, Writev).Times(0);
stream_->Fin(Location(0, 0));
}
TEST_F(OutgoingSubgroupStreamTest, UpdatePriority) {
EXPECT_CALL(mock_stream_, SetPriority(webtransport::StreamPriority{
0, 0x3fc0000000000000ULL}));
stream_->UpdatePriority(0);
}
TEST_F(OutgoingSubgroupStreamTest, SendFragmentedObject) {
PublishedObject obj0 = DefaultObject();
obj0.metadata.payload_length = 15;
obj0.payload.clear();
obj0.payload.push_back(quiche::QuicheMemSlice::Copy("part1"));
obj0.payload.push_back(quiche::QuicheMemSlice::Copy("part2"));
obj0.fin_after_this = true;
EXPECT_CALL(*track_publisher_, GetCachedObject(0, Optional(0), 0, 0))
.WillOnce(Return(std::move(obj0)));
EXPECT_CALL(visitor_, InWindow).WillRepeatedly(Return(true));
EXPECT_CALL(mock_stream_, CanWrite).WillRepeatedly(Return(true));
EXPECT_CALL(visitor_, delivery_timeout())
.WillRepeatedly(Return(quic::QuicTimeDelta::FromSeconds(1)));
EXPECT_CALL(visitor_, alternate_delivery_timeout())
.WillRepeatedly(Return(false));
EXPECT_CALL(visitor_, clock()).WillRepeatedly(Return(&mock_clock_));
EXPECT_CALL(*track_publisher_, extensions())
.WillRepeatedly(ReturnRef(track_extensions_));
EXPECT_CALL(mock_stream_, Writev)
.WillOnce([&](absl::Span<quiche::QuicheMemSlice> data,
const webtransport::StreamWriteOptions& options) {
EXPECT_EQ(data.size(), 3);
EXPECT_EQ(data[1].AsStringView(), "part1");
EXPECT_EQ(data[2].AsStringView(), "part2");
EXPECT_FALSE(options.send_fin());
return absl::OkStatus();
});
EXPECT_CALL(visitor_, OnObjectSent(Location(0, 0)));
stream_->OnCanWrite();
PublishedObject obj1 = DefaultObject();
obj1.metadata.payload_length = 15;
obj1.payload.clear();
obj1.payload.push_back(quiche::QuicheMemSlice::Copy("part3"));
obj1.fin_after_this = true;
EXPECT_CALL(*track_publisher_, GetCachedObject(0, Optional(0), 0, 10))
.WillOnce(Return(std::move(obj1)));
EXPECT_CALL(*track_publisher_, GetCachedObject(0, Optional(0), 1, 0))
.WillOnce(Return(std::nullopt));
EXPECT_CALL(mock_stream_, Writev)
.WillOnce([&](absl::Span<quiche::QuicheMemSlice> data,
const webtransport::StreamWriteOptions& options) {
EXPECT_EQ(data.size(), 1);
EXPECT_EQ(data[0].AsStringView(), "part3");
EXPECT_TRUE(options.send_fin());
return absl::OkStatus();
});
EXPECT_CALL(visitor_, OnObjectSent).Times(0);
ExpectAlarm();
stream_->OnCanWrite();
}
class OutgoingFetchStreamTest : public quic::test::QuicTest {
public:
OutgoingFetchStreamTest()
: task_(std::make_unique<StrictMock<MockFetchTask>>()),
task_ptr_(task_.get()),
trace_recorder_(nullptr) {
EXPECT_CALL(mock_stream_, GetStreamId()).WillRepeatedly(Return(14));
EXPECT_CALL(mock_stream_, SetPriority);
stream_ = std::make_unique<OutgoingFetchStream>(
framer_, &mock_stream_, 10, webtransport::StreamPriority(),
std::move(task_), [this]() { close_callback_called_ = true; },
&trace_recorder_);
}
~OutgoingFetchStreamTest() override {
stream_.reset();
EXPECT_TRUE(close_callback_called_);
}
protected:
MoqtFramer framer_{true};
StrictMock<webtransport::test::MockStream> mock_stream_;
std::unique_ptr<StrictMock<MockFetchTask>> task_;
MockFetchTask* task_ptr_;
MoqtTraceRecorder trace_recorder_;
bool close_callback_called_ = false;
std::unique_ptr<OutgoingFetchStream> stream_;
};
TEST_F(OutgoingFetchStreamTest, OnCanWritePending) {
EXPECT_CALL(mock_stream_, CanWrite()).WillOnce(Return(true));
EXPECT_CALL(*task_ptr_, GetNextObject)
.WillOnce(Return(MoqtFetchTask::kPending));
stream_->OnCanWrite();
}
TEST_F(OutgoingFetchStreamTest, OnCanWriteSuccess) {
PublishedObject obj = DefaultObject();
EXPECT_CALL(mock_stream_, CanWrite())
.WillOnce(Return(true))
.WillOnce(Return(false));
EXPECT_CALL(*task_ptr_, GetNextObject).WillOnce([&](PublishedObject& out) {
out = std::move(obj);
return MoqtFetchTask::kSuccess;
});
EXPECT_CALL(mock_stream_, Writev).WillOnce(Return(absl::OkStatus()));
stream_->OnCanWrite();
}
TEST_F(OutgoingFetchStreamTest, OnCanWriteNonNormalStatus) {
PublishedObject obj = DefaultObject();
obj.metadata.status = MoqtObjectStatus::kObjectDoesNotExist;
EXPECT_CALL(mock_stream_, CanWrite())
.WillOnce(Return(true))
.WillOnce(Return(false));
EXPECT_CALL(*task_ptr_, GetNextObject).WillOnce([&](PublishedObject& out) {
out = std::move(obj);
return MoqtFetchTask::kSuccess;
});
EXPECT_QUICHE_BUG(stream_->OnCanWrite(), "Got Non-normal object in FETCH");
}
TEST_F(OutgoingFetchStreamTest, OnCanWriteEof) {
EXPECT_CALL(mock_stream_, CanWrite()).WillOnce(Return(true));
EXPECT_CALL(*task_ptr_, GetNextObject).WillOnce(Return(MoqtFetchTask::kEof));
EXPECT_CALL(mock_stream_, Writev)
.WillOnce([](absl::Span<quiche::QuicheMemSlice> data,
const webtransport::StreamWriteOptions& options) {
EXPECT_TRUE(data.empty());
EXPECT_TRUE(options.send_fin());
return absl::OkStatus();
});
stream_->OnCanWrite();
}
TEST_F(OutgoingFetchStreamTest, OnCanWriteEofFail) {
EXPECT_CALL(mock_stream_, CanWrite()).WillOnce(Return(true));
EXPECT_CALL(*task_ptr_, GetNextObject).WillOnce(Return(MoqtFetchTask::kEof));
EXPECT_CALL(mock_stream_, Writev)
.WillOnce(Return(absl::InternalError("error")));
stream_->OnCanWrite();
}
TEST_F(OutgoingFetchStreamTest, OnCanWriteWriteError) {
PublishedObject obj = DefaultObject();
EXPECT_CALL(mock_stream_, CanWrite())
.WillOnce(Return(true))
.WillOnce(Return(false));
EXPECT_CALL(*task_ptr_, GetNextObject).WillOnce([&](PublishedObject& out) {
out = std::move(obj);
return MoqtFetchTask::kSuccess;
});
EXPECT_CALL(mock_stream_, Writev)
.WillOnce(Return(absl::InternalError("error")));
EXPECT_QUICHE_BUG(stream_->OnCanWrite(),
"Writing into MoQT stream failed despite CanWrite being "
"true before; status: INTERNAL: error");
}
TEST_F(OutgoingFetchStreamTest, OnCanWriteError) {
EXPECT_CALL(mock_stream_, CanWrite()).WillOnce(Return(true));
EXPECT_CALL(*task_ptr_, GetNextObject)
.WillOnce(Return(MoqtFetchTask::kError));
EXPECT_CALL(*task_ptr_, GetStatus())
.WillOnce(Return(absl::InternalError("error")));
EXPECT_CALL(
mock_stream_,
ResetWithUserCode(static_cast<uint64_t>(absl::StatusCode::kInternal)));
stream_->OnCanWrite();
}
TEST_F(OutgoingFetchStreamTest, OnStopSendingReceived) {
EXPECT_CALL(mock_stream_, ResetWithUserCode(17));
stream_->OnStopSendingReceived(17);
}
TEST_F(OutgoingFetchStreamTest, UpdatePriority) {
EXPECT_CALL(mock_stream_, SetPriority(webtransport::StreamPriority{
0, 0x3fc0000000000000ULL}));
stream_->UpdatePriority(0);
}
TEST_F(OutgoingFetchStreamTest, ObjectAvailableCallback) {
EXPECT_CALL(mock_stream_, CanWrite()).WillOnce(Return(false));
task_ptr_->CallObjectsAvailableCallback();
}
MoqtObject kDefaultObject = {
2, // track_alias
0, // group_id
0, // object_id
0x80, // publisher_priority
"", // extension_headers
MoqtObjectStatus::kNormal,
0, // subgroup_id
0, // payload_length
};
class MockSessionToUniStreamInterface : public SessionToUniStreamInterface {
public:
MockSessionToUniStreamInterface() = default;
~MockSessionToUniStreamInterface() override = default;
MOCK_METHOD(bool, deliver_partial_objects, (), (const, override));
MOCK_METHOD(void, OnMalformedTrack, (RemoteTrack*), (override));
MOCK_METHOD(quiche::QuicheWeakPtr<RemoteTrack>, GetSubscribe, (uint64_t),
(override));
MOCK_METHOD(quiche::QuicheWeakPtr<RemoteTrack>, GetFetch, (uint64_t),
(override));
MOCK_METHOD(void, Error, (MoqtError, absl::string_view), (override));
};
class IncomingDataStreamTest : public quic::test::QuicTest {
public:
IncomingDataStreamTest()
: mock_stream_(14),
ftn_("foo", "bar"),
subscribe_message_(1, ftn_, MessageParameters()) {
EXPECT_CALL(session_, deliver_partial_objects())
.WillRepeatedly(Return(false));
track_ = std::make_unique<SubscribeRemoteTrack>(
subscribe_message_, &visitor_, []() {},
[this](uint64_t alias, SubscribeRemoteTrack* track) -> bool {
alias_ = alias;
alias_track_ = track;
return true;
});
EXPECT_TRUE(track_->set_track_alias(2));
CreateStream();
}
void CreateStream() {
stream_ = std::make_unique<IncomingDataStream>(&mock_stream_, &session_,
&mock_clock_);
}
void ProcessStreamType(MoqtDataStreamType type) {
uint8_t type_byte = static_cast<uint8_t>(type.value());
mock_stream_.Receive(
absl::string_view(reinterpret_cast<const char*>(&type_byte), 1), false);
stream_->OnCanRead();
}
void ProcessAlias(uint8_t alias) {
mock_stream_.Receive(
absl::string_view(reinterpret_cast<const char*>(&alias), 1), false);
EXPECT_CALL(session_, GetSubscribe(alias))
.WillOnce(Return(track_->weak_ptr()));
stream_->OnCanRead();
EXPECT_EQ(alias_, alias);
EXPECT_EQ(alias_track_, track_.get());
}
webtransport::test::InMemoryStream mock_stream_;
testing::NiceMock<MockSessionToUniStreamInterface> session_;
quic::MockClock mock_clock_;
FullTrackName ftn_;
MoqtSubscribe subscribe_message_;
testing::NiceMock<MockSubscribeRemoteTrackVisitor> visitor_;
std::unique_ptr<SubscribeRemoteTrack> track_;
std::unique_ptr<IncomingDataStream> stream_;
uint64_t alias_ = 0;
SubscribeRemoteTrack* alias_track_ = nullptr;
};
TEST_F(IncomingDataStreamTest, DestructorBeforeTrackAlias) {
// The stream doesn't know the track, so there's no visitor to notify.
EXPECT_CALL(visitor_, OnStreamReset).Times(0);
stream_.reset();
}
TEST_F(IncomingDataStreamTest, DestructorAfterObject) {
ProcessStreamType(MoqtDataStreamType::Subgroup(0, 0, false, 0x80));
ProcessAlias(2);
EXPECT_CALL(visitor_, OnObjectFragment);
stream_->OnObjectMessage(kDefaultObject, "", true);
EXPECT_CALL(visitor_, OnStreamReset);
stream_.reset();
}
TEST_F(IncomingDataStreamTest, DestructorAfterFin) {
ProcessStreamType(MoqtDataStreamType::Subgroup(0, 0, false, 0x80));
ProcessAlias(2);
EXPECT_CALL(visitor_, OnObjectFragment);
stream_->OnObjectMessage(kDefaultObject, "", true);
stream_->OnFin();
EXPECT_CALL(visitor_, OnStreamFin);
stream_.reset();
}
TEST_F(IncomingDataStreamTest, OnParsingError) {
EXPECT_CALL(session_,
Error(MoqtError::kProtocolViolation, "Parse error: reason"))
.Times(1);
stream_->OnParsingError(MoqtError::kProtocolViolation, "reason");
}
TEST_F(IncomingDataStreamTest, OnObjectMessageNoTrackAliasError) {
EXPECT_QUICHE_BUG(stream_->OnObjectMessage(kDefaultObject, "payload", true),
"Object delivered without preliminaries");
}
TEST_F(IncomingDataStreamTest, OnObjectMessage) {
ProcessStreamType(MoqtDataStreamType::Subgroup(0, 0, false, 0x80));
ProcessAlias(2);
MoqtObject object = kDefaultObject;
object.payload_length = 8;
EXPECT_CALL(visitor_, OnObjectFragment)
.WillOnce([&](const FullTrackName& track_name,
const PublishedObjectMetadata& metadata,
const absl::string_view received_payload, uint64_t offset) {
EXPECT_EQ(track_name, ftn_);
EXPECT_EQ(metadata.location, Location(0, 0));
EXPECT_EQ(metadata.subgroup, 0);
EXPECT_EQ(metadata.extensions, "");
EXPECT_EQ(metadata.status, MoqtObjectStatus::kNormal);
EXPECT_EQ(metadata.publisher_priority, 0x80);
EXPECT_EQ(metadata.payload_length, 8);
EXPECT_EQ(received_payload, "deadbeef");
EXPECT_EQ(offset, 0);
});
stream_->OnObjectMessage(object, "deadbeef", true);
}
TEST_F(IncomingDataStreamTest, OnObjectMessageBufferPartialObject) {
ProcessStreamType(MoqtDataStreamType::Subgroup(0, 0, false, 0x80));
ProcessAlias(2);
MoqtObject object = kDefaultObject;
object.payload_length = 6;
EXPECT_CALL(visitor_, OnObjectFragment).Times(0);
stream_->OnObjectMessage(object, "foo", false);
EXPECT_CALL(visitor_, OnObjectFragment)
.WillOnce([&](const FullTrackName& track_name,
const PublishedObjectMetadata& metadata,
const absl::string_view received_payload, uint64_t offset) {
EXPECT_EQ(metadata.payload_length, 6);
EXPECT_EQ(received_payload, "foobar");
EXPECT_EQ(offset, 0);
});
stream_->OnObjectMessage(object, "bar", true);
}
TEST_F(IncomingDataStreamTest, OnObjectMessageDontBufferPartialObject) {
EXPECT_CALL(session_, deliver_partial_objects()).WillRepeatedly(Return(true));
ProcessStreamType(MoqtDataStreamType::Subgroup(0, 0, false, 0x80));
ProcessAlias(2);
MoqtObject object = kDefaultObject;
object.payload_length = 6;
EXPECT_CALL(visitor_, OnObjectFragment).Times(0);
EXPECT_CALL(visitor_, OnObjectFragment)
.WillOnce([&](const FullTrackName& track_name,
const PublishedObjectMetadata& metadata,
const absl::string_view received_payload, uint64_t offset) {
EXPECT_EQ(metadata.payload_length, 6);
EXPECT_EQ(received_payload, "foo");
EXPECT_EQ(offset, 0);
});
stream_->OnObjectMessage(object, "foo", false);
EXPECT_CALL(visitor_, OnObjectFragment)
.WillOnce([&](const FullTrackName& track_name,
const PublishedObjectMetadata& metadata,
const absl::string_view received_payload, uint64_t offset) {
EXPECT_EQ(metadata.payload_length, 6);
EXPECT_EQ(received_payload, "bar");
EXPECT_EQ(offset, 3);
});
stream_->OnObjectMessage(object, "bar", true);
// New object, make sure offset has been reset.
++object.object_id;
EXPECT_CALL(visitor_, OnObjectFragment)
.WillOnce([&](const FullTrackName& track_name,
const PublishedObjectMetadata& metadata,
const absl::string_view received_payload, uint64_t offset) {
EXPECT_EQ(metadata.payload_length, 6);
EXPECT_EQ(received_payload, "foobaz");
EXPECT_EQ(offset, 0);
});
stream_->OnObjectMessage(object, "foobaz", true);
}
TEST_F(IncomingDataStreamTest, OnObjectMessageInvalidTrack) {
ProcessStreamType(MoqtDataStreamType::Subgroup(0, 0, false, 0x80));
uint8_t alias = 2;
mock_stream_.Receive(
absl::string_view(reinterpret_cast<const char*>(&alias), 1), false);
EXPECT_CALL(session_, GetSubscribe(2))
.WillOnce(Return(quiche::QuicheWeakPtr<RemoteTrack>()));
stream_->OnCanRead();
EXPECT_TRUE(mock_stream_.was_reset());
}
TEST_F(IncomingDataStreamTest, OnObjectMessageNotInWindow) {
ProcessStreamType(MoqtDataStreamType::Subgroup(0, 0, false, 0x80));
ProcessAlias(2);
track_->parameters().set_forward(false);
EXPECT_CALL(visitor_, OnObjectFragment).Times(0);
stream_->OnObjectMessage(kDefaultObject, "", true);
}
TEST_F(IncomingDataStreamTest, OnObjectMessageMissingSubgroupId) {
ProcessStreamType(MoqtDataStreamType::Subgroup(0, 0, false, 0x80));
ProcessAlias(2);
MoqtObject object = kDefaultObject;
object.subgroup_id = std::nullopt;
EXPECT_QUICHE_BUG(stream_->OnObjectMessage(object, "", true),
"Missing subgroup ID on SUBSCRIBE stream");
}
TEST_F(IncomingDataStreamTest, ObjectAfterTrackEnd) {
ProcessStreamType(MoqtDataStreamType::Subgroup(0, 0, false, 0x80));
ProcessAlias(2);
MoqtObject object = kDefaultObject;
object.object_status = MoqtObjectStatus::kEndOfTrack;
EXPECT_CALL(visitor_, OnObjectFragment);
stream_->OnObjectMessage(object, "", true);
EXPECT_CALL(session_, OnMalformedTrack(track_.get()));
MoqtObject object2 = object;
object2.object_id = 1;
object2.object_status = MoqtObjectStatus::kNormal;
stream_->OnObjectMessage(object2, "", true);
}
TEST_F(IncomingDataStreamTest, ObjectAfterGroupEnd) {
ProcessStreamType(MoqtDataStreamType::Subgroup(0, 0, false, 0x80));
ProcessAlias(2);
MoqtObject object = kDefaultObject;
object.object_status = MoqtObjectStatus::kEndOfGroup;
EXPECT_CALL(visitor_, OnObjectFragment);
stream_->OnObjectMessage(object, "", true);
EXPECT_CALL(session_, OnMalformedTrack(track_.get()));
MoqtObject object2 = object;
object2.object_id = 1;
object2.object_status = MoqtObjectStatus::kNormal;
stream_->OnObjectMessage(object2, "", true);
}
TEST_F(IncomingDataStreamTest, MaybeReadOneObjectUnexpectedState) {
EXPECT_QUICHE_BUG(stream_->MaybeReadOneObject(),
"Requesting object, parser in unexpected state");
}
TEST_F(IncomingDataStreamTest, OnCanReadFetchNewTrackAliasInvalidFetch) {
char fetch_bytes[] = {0x05, 0x03};
mock_stream_.Receive(absl::string_view(fetch_bytes, 2), false);
EXPECT_CALL(session_, GetFetch(3))
.WillOnce(Return(quiche::QuicheWeakPtr<RemoteTrack>()));
stream_->OnCanRead();
EXPECT_TRUE(mock_stream_.was_reset());
}
TEST_F(IncomingDataStreamTest, OnCanReadFetchNewTrackAliasSuccess) {
MoqtFetch fetch;
fetch.request_id = 3;
StandaloneFetch standalone(ftn_, Location(0, 0), Location(0, 9));
auto upstream_fetch = std::make_unique<UpstreamFetch>(
fetch, standalone, [](std::unique_ptr<MoqtFetchTask>) {}, []() {});
upstream_fetch->OnFetchResult(Location(0, 0), absl::OkStatus(), []() {});
EXPECT_CALL(session_, GetFetch(3))
.WillOnce(Return(upstream_fetch->weak_ptr()));
char fetch_bytes[] = {0x05, 0x03};
mock_stream_.Receive(absl::string_view(fetch_bytes, 2), false);
stream_->OnCanRead();
}
} // namespace
} // namespace moqt::test