| // Copyright 2026 The Chromium Authors. All rights reserved. |
| // Use of this source code is governed by a BSD-style license that can be |
| // found in the LICENSE file. |
| |
| #include "quiche/quic/moqt/moqt_uni_stream.h" |
| |
| #include <cstdint> |
| #include <memory> |
| #include <optional> |
| #include <string> |
| #include <utility> |
| |
| #include "absl/status/status.h" |
| #include "absl/strings/string_view.h" |
| #include "absl/types/span.h" |
| #include "quiche/quic/core/quic_alarm_factory.h" |
| #include "quiche/quic/core/quic_time.h" |
| #include "quiche/quic/moqt/moqt_error.h" |
| #include "quiche/quic/moqt/moqt_framer.h" |
| #include "quiche/quic/moqt/moqt_key_value_pair.h" |
| #include "quiche/quic/moqt/moqt_names.h" |
| #include "quiche/quic/moqt/moqt_object.h" |
| #include "quiche/quic/moqt/moqt_trace_recorder.h" |
| #include "quiche/quic/moqt/moqt_types.h" |
| #include "quiche/quic/moqt/test_tools/moqt_mock_visitor.h" |
| #include "quiche/quic/platform/api/quic_test.h" |
| #include "quiche/quic/test_tools/mock_clock.h" |
| #include "quiche/quic/test_tools/quic_test_utils.h" |
| #include "quiche/common/platform/api/quiche_expect_bug.h" |
| #include "quiche/common/quiche_mem_slice.h" |
| #include "quiche/common/quiche_weak_ptr.h" |
| #include "quiche/web_transport/test_tools/mock_web_transport.h" |
| #include "quiche/web_transport/web_transport.h" |
| |
| namespace moqt::test { |
| |
| namespace { |
| |
| using ::testing::Optional; |
| using ::testing::Return; |
| using ::testing::ReturnRef; |
| using ::testing::StrictMock; |
| |
| PublishedObject DefaultObject() { |
| PublishedObject object; |
| object.metadata.location = Location(0, 0); |
| object.metadata.subgroup = 0; |
| object.metadata.status = MoqtObjectStatus::kNormal; |
| object.metadata.arrival_time = quic::QuicTime::Zero(); |
| object.metadata.payload_length = 7; |
| object.payload.push_back(quiche::QuicheMemSlice::Copy("payload")); |
| object.fin_after_this = false; |
| return object; |
| } |
| |
| class MockSubscriptionPublisherInterface |
| : public SubscriptionPublisherInterface { |
| public: |
| MockSubscriptionPublisherInterface() : weak_ptr_factory_(this) {} |
| |
| MOCK_METHOD(bool, InWindow, (Location), (override)); |
| MOCK_METHOD(bool, alternate_delivery_timeout, (), (override)); |
| MOCK_METHOD(quic::QuicClock*, clock, (), (override)); |
| MOCK_METHOD(quic::QuicTimeDelta, delivery_timeout, (), (override)); |
| MOCK_METHOD(quic::QuicAlarmFactory*, alarm_factory, (), (override)); |
| MOCK_METHOD(void, OnObjectSent, (Location), (override)); |
| MOCK_METHOD(void, OnStreamTimeout, (DataStreamIndex), (override)); |
| MOCK_METHOD(void, OnSubgroupAbandoned, |
| (uint64_t, uint64_t, webtransport::StreamErrorCode), (override)); |
| MOCK_METHOD(void, OnDataStreamDestroyed, (DataStreamIndex), (override)); |
| |
| quiche::QuicheWeakPtr<SubscriptionPublisherInterface> GetWeakPtr() { |
| return weak_ptr_factory_.Create(); |
| } |
| |
| private: |
| quiche::QuicheWeakPtrFactory<SubscriptionPublisherInterface> |
| weak_ptr_factory_; |
| }; |
| |
| class OutgoingSubgroupStreamTest : public quic::test::QuicTest { |
| public: |
| OutgoingSubgroupStreamTest() |
| : index_(0, 0), |
| track_publisher_(std::make_shared<StrictMock<MockTrackPublisher>>( |
| FullTrackName("foo", "bar"))), |
| trace_recorder_(nullptr) { |
| EXPECT_CALL(mock_stream_, GetStreamId()).WillRepeatedly(Return(14)); |
| CreateStream(); |
| } |
| ~OutgoingSubgroupStreamTest() override { |
| EXPECT_CALL(visitor_, OnDataStreamDestroyed(index_)); |
| } |
| |
| void CreateStream(uint64_t next_object = 0) { |
| EXPECT_CALL(mock_stream_, SetPriority); |
| stream_ = std::make_unique<OutgoingSubgroupStream>( |
| framer_, &mock_stream_, index_, next_object, visitor_.GetWeakPtr(), |
| track_publisher_, webtransport::StreamPriority(), 0, &trace_recorder_); |
| } |
| |
| void ExpectFin() { |
| EXPECT_CALL(mock_stream_, Writev) |
| .WillOnce([](absl::Span<quiche::QuicheMemSlice> data, |
| const webtransport::StreamWriteOptions& options) { |
| EXPECT_TRUE(data.empty()); |
| EXPECT_TRUE(options.send_fin()); |
| return absl::OkStatus(); |
| }); |
| } |
| |
| void ExpectAlarm() { |
| EXPECT_CALL(visitor_, alarm_factory()).WillOnce(Return(&alarm_factory_)); |
| } |
| |
| MoqtFramer framer_{true}; |
| StrictMock<webtransport::test::MockStream> mock_stream_; |
| DataStreamIndex index_; |
| std::shared_ptr<StrictMock<MockTrackPublisher>> track_publisher_; |
| StrictMock<MockSubscriptionPublisherInterface> visitor_; |
| MoqtTraceRecorder trace_recorder_; |
| TrackExtensions track_extensions_; |
| quic::MockClock mock_clock_; |
| quic::test::MockAlarmFactory alarm_factory_; |
| std::unique_ptr<OutgoingSubgroupStream> stream_; |
| }; |
| |
| TEST_F(OutgoingSubgroupStreamTest, OnCanWrite) { |
| EXPECT_CALL(mock_stream_, CanWrite()).WillOnce(Return(false)); |
| stream_->OnCanWrite(); |
| } |
| |
| TEST_F(OutgoingSubgroupStreamTest, OnStopSendingReceived) { |
| EXPECT_CALL(visitor_, OnSubgroupAbandoned(index_.group, index_.subgroup, 1)); |
| stream_->OnStopSendingReceived(1); |
| } |
| |
| TEST_F(OutgoingSubgroupStreamTest, DeliveryTimeoutAlarm) { |
| OutgoingSubgroupStream::DeliveryTimeoutDelegate delegate(stream_.get()); |
| EXPECT_CALL(visitor_, OnStreamTimeout(index_)); |
| EXPECT_CALL(mock_stream_, ResetWithUserCode(kResetCodeDeliveryTimeout)); |
| delegate.OnAlarm(); |
| } |
| |
| TEST_F(OutgoingSubgroupStreamTest, OnCanWriteCompleteFlow) { |
| PublishedObject obj0 = DefaultObject(); |
| EXPECT_CALL(mock_stream_, CanWrite()) |
| .WillOnce(Return(true)) |
| .WillOnce(Return(false)); |
| EXPECT_CALL(*track_publisher_, GetCachedObject(0, Optional(0), 0, 0)) |
| .WillOnce(Return(std::move(obj0))); |
| EXPECT_CALL(visitor_, InWindow(Location(0, 0))).WillOnce(Return(true)); |
| EXPECT_CALL(visitor_, delivery_timeout()) |
| .WillOnce(Return(quic::QuicTimeDelta::FromSeconds(1))); |
| EXPECT_CALL(visitor_, alternate_delivery_timeout()).WillOnce(Return(false)); |
| EXPECT_CALL(visitor_, clock()).WillOnce(Return(&mock_clock_)); |
| EXPECT_CALL(*track_publisher_, extensions()) |
| .WillRepeatedly(ReturnRef(track_extensions_)); |
| EXPECT_CALL(mock_stream_, Writev).WillOnce(Return(absl::OkStatus())); |
| EXPECT_CALL(visitor_, OnObjectSent(Location(0, 0))); |
| stream_->OnCanWrite(); |
| } |
| |
| TEST_F(OutgoingSubgroupStreamTest, OnCanWriteNotInWindow) { |
| PublishedObject obj0 = DefaultObject(); |
| |
| EXPECT_CALL(mock_stream_, CanWrite()).WillOnce(Return(true)); |
| EXPECT_CALL(*track_publisher_, GetCachedObject(0, Optional(0), 0, 0)) |
| .WillOnce(Return(std::move(obj0))); |
| EXPECT_CALL(visitor_, InWindow(Location(0, 0))).WillOnce(Return(false)); |
| ExpectFin(); |
| stream_->OnCanWrite(); |
| } |
| |
| TEST_F(OutgoingSubgroupStreamTest, OnCanWriteTimeout) { |
| PublishedObject obj0 = DefaultObject(); |
| EXPECT_CALL(mock_stream_, CanWrite()).WillOnce(Return(true)); |
| EXPECT_CALL(*track_publisher_, GetCachedObject(0, Optional(0), 0, 0)) |
| .WillOnce(Return(std::move(obj0))); |
| EXPECT_CALL(visitor_, InWindow(Location(0, 0))).WillOnce(Return(true)); |
| EXPECT_CALL(visitor_, delivery_timeout()) |
| .WillOnce(Return(quic::QuicTimeDelta::FromSeconds(1))); |
| EXPECT_CALL(visitor_, alternate_delivery_timeout()).WillOnce(Return(false)); |
| mock_clock_.AdvanceTime(quic::QuicTimeDelta::FromSeconds(2)); |
| EXPECT_CALL(visitor_, clock()).WillOnce(Return(&mock_clock_)); |
| EXPECT_CALL(visitor_, OnStreamTimeout(index_)); |
| EXPECT_CALL(mock_stream_, ResetWithUserCode(kResetCodeDeliveryTimeout)); |
| stream_->OnCanWrite(); |
| } |
| |
| TEST_F(OutgoingSubgroupStreamTest, OnCanWriteWriteError) { |
| PublishedObject obj0 = DefaultObject(); |
| EXPECT_CALL(mock_stream_, CanWrite()).WillOnce(Return(true)); |
| EXPECT_CALL(*track_publisher_, GetCachedObject(0, Optional(0), 0, 0)) |
| .WillOnce(Return(std::move(obj0))); |
| EXPECT_CALL(visitor_, InWindow(Location(0, 0))).WillOnce(Return(true)); |
| EXPECT_CALL(visitor_, delivery_timeout()) |
| .WillOnce(Return(quic::QuicTimeDelta::FromSeconds(1))); |
| EXPECT_CALL(visitor_, alternate_delivery_timeout()).WillOnce(Return(false)); |
| EXPECT_CALL(visitor_, clock).WillOnce(Return(&mock_clock_)); |
| EXPECT_CALL(*track_publisher_, extensions()) |
| .WillRepeatedly(ReturnRef(track_extensions_)); |
| EXPECT_CALL(mock_stream_, Writev) |
| .WillOnce(Return(absl::InternalError("error"))); |
| EXPECT_CALL(mock_stream_, ResetWithUserCode(kResetCodeInternalError)); |
| EXPECT_QUICHE_BUG( |
| stream_->OnCanWrite(), |
| "Writing into MoQT stream failed despite CanWrite being true before; " |
| "status: INTERNAL: error"); |
| } |
| |
| TEST_F(OutgoingSubgroupStreamTest, OnCanWriteSetsAlarm) { |
| PublishedObject obj0 = DefaultObject(); |
| obj0.fin_after_this = true; |
| EXPECT_CALL(mock_stream_, CanWrite()) |
| .WillOnce(Return(true)) |
| .WillOnce(Return(false)); |
| EXPECT_CALL(*track_publisher_, GetCachedObject(0, Optional(0), 0, 0)) |
| .WillOnce(Return(std::move(obj0))); |
| EXPECT_CALL(visitor_, InWindow(Location(0, 0))).WillOnce(Return(true)); |
| EXPECT_CALL(visitor_, delivery_timeout()) |
| .WillRepeatedly(Return(quic::QuicTimeDelta::FromSeconds(1))); |
| EXPECT_CALL(visitor_, alternate_delivery_timeout()) |
| .WillRepeatedly(Return(false)); |
| EXPECT_CALL(visitor_, clock).WillOnce(Return(&mock_clock_)); |
| |
| EXPECT_CALL(*track_publisher_, extensions()) |
| .WillRepeatedly(ReturnRef(track_extensions_)); |
| EXPECT_CALL(mock_stream_, Writev).WillOnce(Return(absl::OkStatus())); |
| EXPECT_CALL(visitor_, OnObjectSent(Location(0, 0))); |
| ExpectAlarm(); |
| stream_->OnCanWrite(); |
| } |
| |
| TEST_F(OutgoingSubgroupStreamTest, Fin) { |
| // Replace stream_ with one where next_object_ is 1. |
| EXPECT_CALL(visitor_, OnDataStreamDestroyed(index_)); |
| CreateStream(1); |
| // last_object.object < next_object: sends pure FIN |
| ExpectFin(); |
| EXPECT_CALL(visitor_, delivery_timeout()) |
| .WillOnce(Return(quic::QuicTimeDelta::FromSeconds(1))); |
| EXPECT_CALL(visitor_, clock()).WillOnce(Return(&mock_clock_)); |
| ExpectAlarm(); |
| stream_->Fin(Location(0, 0)); |
| // last_object.object >= next_object: does nothing |
| stream_->Fin(Location(0, 1)); |
| } |
| |
| TEST_F(OutgoingSubgroupStreamTest, UpdatePriority) { |
| EXPECT_CALL(mock_stream_, SetPriority(webtransport::StreamPriority{ |
| 0, 0x3fc0000000000000ULL})); |
| stream_->UpdatePriority(0); |
| } |
| |
| TEST_F(OutgoingSubgroupStreamTest, SendFragmentedObject) { |
| PublishedObject obj0 = DefaultObject(); |
| obj0.metadata.payload_length = 15; |
| obj0.payload.clear(); |
| obj0.payload.push_back(quiche::QuicheMemSlice::Copy("part1")); |
| obj0.payload.push_back(quiche::QuicheMemSlice::Copy("part2")); |
| obj0.fin_after_this = true; |
| EXPECT_CALL(*track_publisher_, GetCachedObject(0, Optional(0), 0, 0)) |
| .WillOnce(Return(std::move(obj0))); |
| EXPECT_CALL(visitor_, InWindow).WillRepeatedly(Return(true)); |
| EXPECT_CALL(mock_stream_, CanWrite).WillRepeatedly(Return(true)); |
| EXPECT_CALL(visitor_, delivery_timeout()) |
| .WillRepeatedly(Return(quic::QuicTimeDelta::FromSeconds(1))); |
| EXPECT_CALL(visitor_, alternate_delivery_timeout()) |
| .WillRepeatedly(Return(false)); |
| EXPECT_CALL(visitor_, clock()).WillRepeatedly(Return(&mock_clock_)); |
| EXPECT_CALL(*track_publisher_, extensions()) |
| .WillRepeatedly(ReturnRef(track_extensions_)); |
| EXPECT_CALL(mock_stream_, Writev) |
| .WillOnce([&](absl::Span<quiche::QuicheMemSlice> data, |
| const webtransport::StreamWriteOptions& options) { |
| EXPECT_EQ(data.size(), 3); |
| EXPECT_EQ(data[1].AsStringView(), "part1"); |
| EXPECT_EQ(data[2].AsStringView(), "part2"); |
| EXPECT_FALSE(options.send_fin()); |
| return absl::OkStatus(); |
| }); |
| EXPECT_CALL(visitor_, OnObjectSent(Location(0, 0))); |
| stream_->OnCanWrite(); |
| PublishedObject obj1 = DefaultObject(); |
| obj1.metadata.payload_length = 15; |
| obj1.payload.clear(); |
| obj1.payload.push_back(quiche::QuicheMemSlice::Copy("part3")); |
| obj1.fin_after_this = true; |
| EXPECT_CALL(*track_publisher_, GetCachedObject(0, Optional(0), 0, 10)) |
| .WillOnce(Return(std::move(obj1))); |
| EXPECT_CALL(*track_publisher_, GetCachedObject(0, Optional(0), 1, 0)) |
| .WillOnce(Return(std::nullopt)); |
| EXPECT_CALL(mock_stream_, Writev) |
| .WillOnce([&](absl::Span<quiche::QuicheMemSlice> data, |
| const webtransport::StreamWriteOptions& options) { |
| EXPECT_EQ(data.size(), 1); |
| EXPECT_EQ(data[0].AsStringView(), "part3"); |
| EXPECT_TRUE(options.send_fin()); |
| return absl::OkStatus(); |
| }); |
| EXPECT_CALL(visitor_, OnObjectSent).Times(0); |
| ExpectAlarm(); |
| stream_->OnCanWrite(); |
| } |
| |
| } // namespace |
| |
| } // namespace moqt::test |