Get rid of moqt::SubscribeWindow. This is a legacy of when there were multiple windows on a single subscribe. Also, make relative joining fetch set the window start correctly. PiperOrigin-RevId: 902630468
diff --git a/build/source_list.bzl b/build/source_list.bzl index 9a665df..3c8fcbe 100644 --- a/build/source_list.bzl +++ b/build/source_list.bzl
@@ -1601,7 +1601,7 @@ "quic/moqt/moqt_session.h", "quic/moqt/moqt_session_callbacks.h", "quic/moqt/moqt_session_interface.h", - "quic/moqt/moqt_subscribe_windows.h", + "quic/moqt/moqt_stream_map.h", "quic/moqt/moqt_trace_recorder.h", "quic/moqt/moqt_track.h", "quic/moqt/moqt_types.h", @@ -1632,7 +1632,7 @@ "quic/moqt/moqt_relay_publisher.cc", "quic/moqt/moqt_relay_track_publisher.cc", "quic/moqt/moqt_session.cc", - "quic/moqt/moqt_subscribe_windows.cc", + "quic/moqt/moqt_stream_map.cc", "quic/moqt/moqt_trace_recorder.cc", "quic/moqt/moqt_track.cc", "quic/moqt/relay_namespace_tree.cc", @@ -1662,7 +1662,7 @@ "quic/moqt/moqt_relay_publisher_test.cc", "quic/moqt/moqt_relay_track_publisher_test.cc", "quic/moqt/moqt_session_test.cc", - "quic/moqt/moqt_subscribe_windows_test.cc", + "quic/moqt/moqt_stream_map_test.cc", "quic/moqt/moqt_track_test.cc", "quic/moqt/relay_namespace_tree_test.cc", "quic/moqt/session_namespace_tree_test.cc",
diff --git a/build/source_list.gni b/build/source_list.gni index 392014b..b926f30 100644 --- a/build/source_list.gni +++ b/build/source_list.gni
@@ -1605,7 +1605,7 @@ "src/quiche/quic/moqt/moqt_session.h", "src/quiche/quic/moqt/moqt_session_callbacks.h", "src/quiche/quic/moqt/moqt_session_interface.h", - "src/quiche/quic/moqt/moqt_subscribe_windows.h", + "src/quiche/quic/moqt/moqt_stream_map.h", "src/quiche/quic/moqt/moqt_trace_recorder.h", "src/quiche/quic/moqt/moqt_track.h", "src/quiche/quic/moqt/moqt_types.h", @@ -1636,7 +1636,7 @@ "src/quiche/quic/moqt/moqt_relay_publisher.cc", "src/quiche/quic/moqt/moqt_relay_track_publisher.cc", "src/quiche/quic/moqt/moqt_session.cc", - "src/quiche/quic/moqt/moqt_subscribe_windows.cc", + "src/quiche/quic/moqt/moqt_stream_map.cc", "src/quiche/quic/moqt/moqt_trace_recorder.cc", "src/quiche/quic/moqt/moqt_track.cc", "src/quiche/quic/moqt/relay_namespace_tree.cc", @@ -1667,7 +1667,7 @@ "src/quiche/quic/moqt/moqt_relay_publisher_test.cc", "src/quiche/quic/moqt/moqt_relay_track_publisher_test.cc", "src/quiche/quic/moqt/moqt_session_test.cc", - "src/quiche/quic/moqt/moqt_subscribe_windows_test.cc", + "src/quiche/quic/moqt/moqt_stream_map_test.cc", "src/quiche/quic/moqt/moqt_track_test.cc", "src/quiche/quic/moqt/relay_namespace_tree_test.cc", "src/quiche/quic/moqt/session_namespace_tree_test.cc",
diff --git a/build/source_list.json b/build/source_list.json index d692ed9..6f39024 100644 --- a/build/source_list.json +++ b/build/source_list.json
@@ -1604,7 +1604,7 @@ "quiche/quic/moqt/moqt_session.h", "quiche/quic/moqt/moqt_session_callbacks.h", "quiche/quic/moqt/moqt_session_interface.h", - "quiche/quic/moqt/moqt_subscribe_windows.h", + "quiche/quic/moqt/moqt_stream_map.h", "quiche/quic/moqt/moqt_trace_recorder.h", "quiche/quic/moqt/moqt_track.h", "quiche/quic/moqt/moqt_types.h", @@ -1635,7 +1635,7 @@ "quiche/quic/moqt/moqt_relay_publisher.cc", "quiche/quic/moqt/moqt_relay_track_publisher.cc", "quiche/quic/moqt/moqt_session.cc", - "quiche/quic/moqt/moqt_subscribe_windows.cc", + "quiche/quic/moqt/moqt_stream_map.cc", "quiche/quic/moqt/moqt_trace_recorder.cc", "quiche/quic/moqt/moqt_track.cc", "quiche/quic/moqt/relay_namespace_tree.cc", @@ -1666,7 +1666,7 @@ "quiche/quic/moqt/moqt_relay_publisher_test.cc", "quiche/quic/moqt/moqt_relay_track_publisher_test.cc", "quiche/quic/moqt/moqt_session_test.cc", - "quiche/quic/moqt/moqt_subscribe_windows_test.cc", + "quiche/quic/moqt/moqt_stream_map_test.cc", "quiche/quic/moqt/moqt_track_test.cc", "quiche/quic/moqt/relay_namespace_tree_test.cc", "quiche/quic/moqt/session_namespace_tree_test.cc",
diff --git a/quiche/quic/moqt/moqt_outgoing_queue.cc b/quiche/quic/moqt/moqt_outgoing_queue.cc index b528528..3abeaaa 100644 --- a/quiche/quic/moqt/moqt_outgoing_queue.cc +++ b/quiche/quic/moqt/moqt_outgoing_queue.cc
@@ -17,7 +17,7 @@ #include "quiche/quic/moqt/moqt_object.h" #include "quiche/quic/moqt/moqt_priority.h" #include "quiche/quic/moqt/moqt_publisher.h" -#include "quiche/quic/moqt/moqt_subscribe_windows.h" +#include "quiche/quic/moqt/moqt_stream_map.h" #include "quiche/quic/moqt/moqt_types.h" #include "quiche/common/platform/api/quiche_bug_tracker.h" #include "quiche/common/quiche_mem_slice.h" @@ -108,10 +108,10 @@ std::vector<Location> MoqtOutgoingQueue::GetCachedObjectsInRange( Location start, Location end) const { std::vector<Location> sequences; - SubscribeWindow window(start, end); for (const Group& group : queue_) { for (const CachedObject& object : group) { - if (window.InWindow(object.metadata.location)) { + if (object.metadata.location >= start && + object.metadata.location <= end) { sequences.push_back(object.metadata.location); } }
diff --git a/quiche/quic/moqt/moqt_outgoing_queue_test.cc b/quiche/quic/moqt/moqt_outgoing_queue_test.cc index 60df583..07b5a5f 100644 --- a/quiche/quic/moqt/moqt_outgoing_queue_test.cc +++ b/quiche/quic/moqt/moqt_outgoing_queue_test.cc
@@ -23,7 +23,6 @@ #include "quiche/quic/moqt/moqt_object.h" #include "quiche/quic/moqt/moqt_priority.h" #include "quiche/quic/moqt/moqt_publisher.h" -#include "quiche/quic/moqt/moqt_subscribe_windows.h" #include "quiche/quic/moqt/moqt_types.h" #include "quiche/quic/moqt/test_tools/moqt_mock_visitor.h" #include "quiche/common/platform/api/quiche_expect_bug.h" @@ -72,14 +71,15 @@ } } - void GetObjectsFromPast(const SubscribeWindow& window) { + void GetObjectsFromPast(Location start, + std::optional<Location> end = std::nullopt) { if (!largest_location().has_value()) { return; } std::vector<Location> objects = GetCachedObjectsInRange(Location(0, 0), *largest_location()); for (Location object : objects) { - if (window.InWindow(object)) { + if (object >= start && (!end.has_value() || object <= *end)) { OnNewObjectAvailable(object, 0, default_publisher_priority()); } } @@ -160,7 +160,7 @@ queue.AddObject(quiche::QuicheMemSlice::Copy("a"), true); queue.AddObject(quiche::QuicheMemSlice::Copy("b"), false); queue.AddObject(quiche::QuicheMemSlice::Copy("c"), false); - queue.GetObjectsFromPast(SubscribeWindow(Location(0, 0))); + queue.GetObjectsFromPast(Location(0, 0)); } TEST(MoqtOutgoingQueue, SingleGroupPastSubscribeFromMidGroup) { @@ -177,7 +177,7 @@ queue.AddObject(quiche::QuicheMemSlice::Copy("a"), true); queue.AddObject(quiche::QuicheMemSlice::Copy("b"), false); queue.AddObject(quiche::QuicheMemSlice::Copy("c"), false); - queue.GetObjectsFromPast(SubscribeWindow(Location(0, 1))); + queue.GetObjectsFromPast(Location(0, 1)); } TEST(MoqtOutgoingQueue, TwoGroups) { @@ -225,7 +225,7 @@ queue.AddObject(quiche::QuicheMemSlice::Copy("d"), true); queue.AddObject(quiche::QuicheMemSlice::Copy("e"), false); queue.AddObject(quiche::QuicheMemSlice::Copy("f"), false); - queue.GetObjectsFromPast(SubscribeWindow(Location(0, 1))); + queue.GetObjectsFromPast(Location(0, 1)); } TEST(MoqtOutgoingQueue, FiveGroups) { @@ -298,7 +298,7 @@ queue.AddObject(quiche::QuicheMemSlice::Copy("h"), false); queue.AddObject(quiche::QuicheMemSlice::Copy("i"), true); queue.AddObject(quiche::QuicheMemSlice::Copy("j"), false); - queue.GetObjectsFromPast(SubscribeWindow(Location(0, 0))); + queue.GetObjectsFromPast(Location(0, 0)); } TEST(MoqtOutgoingQueue, StandaloneFetch) {
diff --git a/quiche/quic/moqt/moqt_session.cc b/quiche/quic/moqt/moqt_session.cc index 2424e34..10522fa 100644 --- a/quiche/quic/moqt/moqt_session.cc +++ b/quiche/quic/moqt/moqt_session.cc
@@ -43,7 +43,7 @@ #include "quiche/quic/moqt/moqt_publisher.h" #include "quiche/quic/moqt/moqt_session_callbacks.h" #include "quiche/quic/moqt/moqt_session_interface.h" -#include "quiche/quic/moqt/moqt_subscribe_windows.h" +#include "quiche/quic/moqt/moqt_stream_map.h" #include "quiche/quic/moqt/moqt_track.h" #include "quiche/quic/moqt/moqt_types.h" #include "quiche/quic/platform/api/quic_logging.h"
diff --git a/quiche/quic/moqt/moqt_session.h b/quiche/quic/moqt/moqt_session.h index 2bdd6ec..fbb194b 100644 --- a/quiche/quic/moqt/moqt_session.h +++ b/quiche/quic/moqt/moqt_session.h
@@ -36,7 +36,7 @@ #include "quiche/quic/moqt/moqt_publisher.h" #include "quiche/quic/moqt/moqt_session_callbacks.h" #include "quiche/quic/moqt/moqt_session_interface.h" -#include "quiche/quic/moqt/moqt_subscribe_windows.h" +#include "quiche/quic/moqt/moqt_stream_map.h" #include "quiche/quic/moqt/moqt_trace_recorder.h" #include "quiche/quic/moqt/moqt_track.h" #include "quiche/quic/moqt/moqt_types.h"
diff --git a/quiche/quic/moqt/moqt_session_test.cc b/quiche/quic/moqt/moqt_session_test.cc index 7997bd1..04af379 100644 --- a/quiche/quic/moqt/moqt_session_test.cc +++ b/quiche/quic/moqt/moqt_session_test.cc
@@ -2778,7 +2778,7 @@ // Packet arrives on FETCH stream. MoqtObject object = { /*request_id=*/2, - /*group_id, object_id=*/0, + /*group_id, object_id=*/2, 0, /*publisher_priority=*/128, /*extension_headers=*/"", @@ -2790,12 +2790,14 @@ std::optional<PublishedObjectMetadata> metadata; quiche::QuicheBuffer header = framer.SerializeObjectHeader( object, MoqtDataStreamType::Fetch(), metadata); - // Open stream, deliver two objects before FETCH_OK. Neither should be read. webtransport::test::InMemoryStream data_stream(kIncomingUniStreamId); data_stream.SetVisitor( MoqtSessionPeer::CreateIncomingStreamVisitor(&session_, &data_stream)); data_stream.Receive(header.AsStringView(), false); EXPECT_CALL(remote_track_visitor_, OnObjectFragment).Times(1); + // Last object of the FETCH causes FETCH_CANCEL. + EXPECT_CALL(mock_stream_, + Writev(ControlMessageOfType(MoqtMessageType::kFetchCancel), _)); data_stream.Receive("foo", false); }
diff --git a/quiche/quic/moqt/moqt_subscribe_windows.cc b/quiche/quic/moqt/moqt_stream_map.cc similarity index 65% rename from quiche/quic/moqt/moqt_subscribe_windows.cc rename to quiche/quic/moqt/moqt_stream_map.cc index fd15698..c11492b 100644 --- a/quiche/quic/moqt/moqt_subscribe_windows.cc +++ b/quiche/quic/moqt/moqt_stream_map.cc
@@ -2,7 +2,7 @@ // Use of this source code is governed by a BSD-style license that can be // found in the LICENSE file. -#include "quiche/quic/moqt/moqt_subscribe_windows.h" +#include "quiche/quic/moqt/moqt_stream_map.h" #include <cstdint> #include <limits> @@ -29,37 +29,13 @@ void SendStreamMap::AddStream(DataStreamIndex index, webtransport::StreamId stream_id) { auto [it, success] = send_streams_.emplace(index, stream_id); - QUIC_BUG_IF(quic_bug_moqt_draft_03_02, !success) << "Stream already added"; + QUICHE_BUG_IF(quic_bug_moqt_draft_03_02, !success) << "Stream already added"; } void SendStreamMap::RemoveStream(DataStreamIndex index) { send_streams_.erase(index); } -bool SubscribeWindow::TruncateStart(Location start) { - if (start < start_) { - return false; - } - start_ = start; - return true; -} - -bool SubscribeWindow::TruncateEnd(uint64_t end_group) { - if (end_group > end_.group) { - return false; - } - end_ = Location(end_group, UINT64_MAX); - return true; -} - -bool SubscribeWindow::TruncateEnd(Location largest_id) { - if (largest_id > end_) { - return false; - } - end_ = largest_id; - return true; -} - std::vector<webtransport::StreamId> SendStreamMap::GetAllStreams() const { std::vector<webtransport::StreamId> ids; for (const auto& [index, stream_id] : send_streams_) { @@ -80,12 +56,4 @@ return ids; } -bool SubscribeWindow::GroupInWindow(uint64_t group) const { - const quic::QuicInterval<Location> group_window( - Location(group, 0), - Location(group, std::numeric_limits<uint64_t>::max())); - const quic::QuicInterval<Location> subscription_window(start_, end_); - return group_window.Intersects(subscription_window); -} - } // namespace moqt
diff --git a/quiche/quic/moqt/moqt_stream_map.h b/quiche/quic/moqt/moqt_stream_map.h new file mode 100644 index 0000000..dfb4ef6 --- /dev/null +++ b/quiche/quic/moqt/moqt_stream_map.h
@@ -0,0 +1,38 @@ +// Copyright 2023 The Chromium Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +#ifndef QUICHE_QUIC_MOQT_STREAM_MAP_H +#define QUICHE_QUIC_MOQT_STREAM_MAP_H + +#include <cstdint> +#include <optional> +#include <vector> + +#include "absl/container/btree_map.h" +#include "quiche/quic/moqt/moqt_messages.h" +#include "quiche/common/platform/api/quiche_export.h" +#include "quiche/web_transport/web_transport.h" + +namespace moqt { + +// A map of outgoing data streams indexed by object sequence numbers. +class QUICHE_EXPORT SendStreamMap { + public: + SendStreamMap() = default; + + std::optional<webtransport::StreamId> GetStreamFor( + DataStreamIndex index) const; + void AddStream(DataStreamIndex index, webtransport::StreamId stream_id); + void RemoveStream(DataStreamIndex index); + std::vector<webtransport::StreamId> GetAllStreams() const; + std::vector<webtransport::StreamId> GetStreamsForGroup( + uint64_t group_id) const; + + private: + absl::btree_map<DataStreamIndex, webtransport::StreamId> send_streams_; +}; + +} // namespace moqt + +#endif // QUICHE_QUIC_MOQT_STREAM_MAP_H
diff --git a/quiche/quic/moqt/moqt_stream_map_test.cc b/quiche/quic/moqt/moqt_stream_map_test.cc new file mode 100644 index 0000000..db99d5e --- /dev/null +++ b/quiche/quic/moqt/moqt_stream_map_test.cc
@@ -0,0 +1,38 @@ +// Copyright 2023 The Chromium Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +#include "quiche/quic/moqt/moqt_stream_map.h" + +#include <optional> + +#include "quiche/quic/moqt/moqt_messages.h" +#include "quiche/quic/platform/api/quic_test.h" +#include "quiche/common/platform/api/quiche_expect_bug.h" +#include "quiche/common/platform/api/quiche_export.h" + +namespace moqt { + +namespace test { + +class QUICHE_EXPORT StreamMapTest : public quic::test::QuicTest { + public: + StreamMapTest() {} +}; + +TEST_F(StreamMapTest, AddQueryRemoveStreamIdSubgroup) { + SendStreamMap stream_map; + stream_map.AddStream(DataStreamIndex{4, 0}, 2); + EXPECT_EQ(stream_map.GetStreamFor(DataStreamIndex(5, 0)), std::nullopt); + stream_map.AddStream(DataStreamIndex{5, 0}, 6); + stream_map.AddStream(DataStreamIndex{5, 1}, 7); + EXPECT_QUICHE_BUG(stream_map.AddStream(DataStreamIndex{5, 0}, 6), + "Stream already added"); + EXPECT_EQ(stream_map.GetStreamFor(DataStreamIndex(4, 0)), 2); + stream_map.RemoveStream(DataStreamIndex{5, 1}); + EXPECT_EQ(stream_map.GetStreamFor(DataStreamIndex(5, 1)), std::nullopt); +} + +} // namespace test + +} // namespace moqt
diff --git a/quiche/quic/moqt/moqt_subscribe_windows.h b/quiche/quic/moqt/moqt_subscribe_windows.h deleted file mode 100644 index 431e9f1..0000000 --- a/quiche/quic/moqt/moqt_subscribe_windows.h +++ /dev/null
@@ -1,75 +0,0 @@ -// Copyright 2023 The Chromium Authors. All rights reserved. -// Use of this source code is governed by a BSD-style license that can be -// found in the LICENSE file. - -#ifndef QUICHE_QUIC_MOQT_SUBSCRIBE_WINDOWS_H -#define QUICHE_QUIC_MOQT_SUBSCRIBE_WINDOWS_H - -#include <cstdint> -#include <optional> -#include <vector> - -#include "absl/container/btree_map.h" -#include "quiche/quic/moqt/moqt_messages.h" -#include "quiche/common/platform/api/quiche_export.h" -#include "quiche/web_transport/web_transport.h" - -namespace moqt { - -// SubscribeWindow represents a window of objects for which an MoQT subscription -// can be valid. -class QUICHE_EXPORT SubscribeWindow { - public: - // Creates a half-open window for SUBSCRIBES. - SubscribeWindow() = default; - SubscribeWindow(Location start) : start_(start) {} - - // Creates a closed window for SUBSCRIBE or FETCH with no end object; - SubscribeWindow(Location start, std::optional<uint64_t> end_group) - : start_(start), - end_(Location(end_group.value_or(UINT64_MAX), UINT64_MAX)) {} - // For FETCH with end object - SubscribeWindow(Location start, Location end) : start_(start), end_(end) {} - - bool InWindow(const Location& seq) const { - return start_ <= seq && seq <= end_; - } - bool GroupInWindow(uint64_t group) const; - Location start() const { return start_; } - Location end() const { return end_; } - - // Updates the subscription window. Returns true if the update is valid (in - // MoQT, subscription windows are only allowed to shrink, not to expand). - // Called only as a result of SUBSCRIBE_OK (largest_id) or REQUEST_UPDATE. - bool TruncateStart(Location start); - // Called only as a result of REQUEST_UPDATE. - bool TruncateEnd(uint64_t end_group); - // Called only as a result of FETCH_OK (largest_id) - bool TruncateEnd(Location largest_id); - - private: - // The subgroups in these sequences have no meaning. - Location start_ = Location(); - Location end_ = Location(UINT64_MAX, UINT64_MAX); -}; - -// A map of outgoing data streams indexed by object sequence numbers. -class QUICHE_EXPORT SendStreamMap { - public: - SendStreamMap() = default; - - std::optional<webtransport::StreamId> GetStreamFor( - DataStreamIndex index) const; - void AddStream(DataStreamIndex index, webtransport::StreamId stream_id); - void RemoveStream(DataStreamIndex index); - std::vector<webtransport::StreamId> GetAllStreams() const; - std::vector<webtransport::StreamId> GetStreamsForGroup( - uint64_t group_id) const; - - private: - absl::btree_map<DataStreamIndex, webtransport::StreamId> send_streams_; -}; - -} // namespace moqt - -#endif // QUICHE_QUIC_MOQT_SUBSCRIBE_WINDOWS_H
diff --git a/quiche/quic/moqt/moqt_subscribe_windows_test.cc b/quiche/quic/moqt/moqt_subscribe_windows_test.cc deleted file mode 100644 index 20cf483..0000000 --- a/quiche/quic/moqt/moqt_subscribe_windows_test.cc +++ /dev/null
@@ -1,66 +0,0 @@ -// Copyright 2023 The Chromium Authors. All rights reserved. -// Use of this source code is governed by a BSD-style license that can be -// found in the LICENSE file. - -#include "quiche/quic/moqt/moqt_subscribe_windows.h" - -#include <cstdint> -#include <optional> - -#include "quiche/quic/moqt/moqt_messages.h" -#include "quiche/quic/platform/api/quic_expect_bug.h" -#include "quiche/quic/platform/api/quic_test.h" -#include "quiche/common/platform/api/quiche_export.h" - -namespace moqt { - -namespace test { - -class QUICHE_EXPORT SubscribeWindowTest : public quic::test::QuicTest { - public: - SubscribeWindowTest() {} - - const uint64_t subscribe_id_ = 2; - const Location start_{4, 0}; - const uint64_t end_ = 5; -}; - -TEST_F(SubscribeWindowTest, Queries) { - SubscribeWindow window(start_, end_); - EXPECT_TRUE(window.InWindow(Location(4, 0))); - EXPECT_TRUE(window.InWindow(Location(5, UINT64_MAX))); - EXPECT_FALSE(window.InWindow(Location(6, 0))); - EXPECT_FALSE(window.InWindow(Location(3, 12))); -} - -TEST_F(SubscribeWindowTest, AddQueryRemoveStreamIdSubgroup) { - SendStreamMap stream_map; - stream_map.AddStream(DataStreamIndex{4, 0}, 2); - EXPECT_EQ(stream_map.GetStreamFor(DataStreamIndex(5, 0)), std::nullopt); - stream_map.AddStream(DataStreamIndex{5, 0}, 6); - stream_map.AddStream(DataStreamIndex{5, 1}, 7); - EXPECT_QUIC_BUG(stream_map.AddStream(DataStreamIndex{5, 0}, 6), - "Stream already added"); - EXPECT_EQ(stream_map.GetStreamFor(DataStreamIndex(4, 0)), 2); - stream_map.RemoveStream(DataStreamIndex{5, 1}); - EXPECT_EQ(stream_map.GetStreamFor(DataStreamIndex(5, 1)), std::nullopt); -} - -TEST_F(SubscribeWindowTest, UpdateStartEnd) { - SubscribeWindow window(start_, end_); - EXPECT_TRUE(window.TruncateStart(start_.Next())); - EXPECT_TRUE(window.TruncateEnd(end_ - 1)); - EXPECT_FALSE(window.InWindow(start_)); - EXPECT_FALSE(window.InWindow(Location(end_, 0))); - // Widens start_ again. - EXPECT_FALSE(window.TruncateStart(start_)); - // Widens end_ again. - EXPECT_FALSE(window.TruncateEnd(end_)); - EXPECT_TRUE(window.TruncateEnd(Location(end_ - 1, 10))); - EXPECT_TRUE(window.InWindow(Location(end_ - 1, 10))); - EXPECT_FALSE(window.InWindow(Location(end_ - 1, 11))); -} - -} // namespace test - -} // namespace moqt
diff --git a/quiche/quic/moqt/moqt_track.cc b/quiche/quic/moqt/moqt_track.cc index 6b5b050..acecdee 100644 --- a/quiche/quic/moqt/moqt_track.cc +++ b/quiche/quic/moqt/moqt_track.cc
@@ -22,6 +22,7 @@ #include "quiche/quic/moqt/moqt_messages.h" #include "quiche/quic/moqt/moqt_object.h" #include "quiche/quic/moqt/moqt_priority.h" +#include "quiche/quic/moqt/moqt_types.h" #include "quiche/common/platform/api/quiche_bug_tracker.h" #include "quiche/common/quiche_buffer_allocator.h" #include "quiche/common/quiche_mem_slice.h" @@ -150,7 +151,12 @@ auto task = std::make_unique<UpstreamFetchTask>(largest_location, status, std::move(callback)); task_ = task->weak_ptr(); - window_.TruncateEnd(largest_location); + if (relative_groups_.has_value() && + (*relative_groups_ < largest_location.group)) { + start_ = Location(largest_location.group - *relative_groups_, 0); + relative_groups_.reset(); + } + end_ = std::min(end_, largest_location); std::move(ok_callback_)(std::move(task)); if (can_read_callback_) { task_.GetIfAvailable()->set_can_read_callback(
diff --git a/quiche/quic/moqt/moqt_track.h b/quiche/quic/moqt/moqt_track.h index 7bef001..5075b33 100644 --- a/quiche/quic/moqt/moqt_track.h +++ b/quiche/quic/moqt/moqt_track.h
@@ -21,7 +21,7 @@ #include "quiche/quic/moqt/moqt_object.h" #include "quiche/quic/moqt/moqt_priority.h" #include "quiche/quic/moqt/moqt_session_interface.h" -#include "quiche/quic/moqt/moqt_subscribe_windows.h" +#include "quiche/quic/moqt/moqt_types.h" #include "quiche/common/quiche_buffer_allocator.h" #include "quiche/common/quiche_callbacks.h" #include "quiche/common/quiche_weak_ptr.h" @@ -197,8 +197,8 @@ : RemoteTrack(standalone.full_track_name, fetch.request_id), group_order_(fetch.parameters.group_order.value_or( MoqtDeliveryOrder::kAscending)), - window_(SubscribeWindow(standalone.start_location, - standalone.end_location)), + start_(standalone.start_location), + end_(standalone.end_location), subscriber_priority_(fetch.parameters.subscriber_priority.value_or( kDefaultSubscriberPriority)), ok_callback_(std::move(callback)) {} @@ -208,7 +208,8 @@ : RemoteTrack(full_track_name, fetch.request_id), group_order_(fetch.parameters.group_order.value_or( MoqtDeliveryOrder::kAscending)), - window_(SubscribeWindow(Location(0, 0))), + relative_groups_( + std::get<JoiningFetchRelative>(fetch.fetch).joining_start), subscriber_priority_(fetch.parameters.subscriber_priority.value_or( kDefaultSubscriberPriority)), ok_callback_(std::move(callback)) {} @@ -219,7 +220,7 @@ : RemoteTrack(full_track_name, fetch.request_id), group_order_(fetch.parameters.group_order.value_or( MoqtDeliveryOrder::kAscending)), - window_(SubscribeWindow(Location(absolute_joining.joining_start, 0))), + start_(Location(absolute_joining.joining_start, 0)), subscriber_priority_(fetch.parameters.subscriber_priority.value_or( kDefaultSubscriberPriority)), ok_callback_(std::move(callback)) {} @@ -227,7 +228,7 @@ ~UpstreamFetch(); bool InWindow(Location location) const override { - return (window_.InWindow(location)); + return (location >= start_ && location <= end_); } MoqtPriority subscriber_priority() const override { @@ -338,7 +339,9 @@ private: MoqtDeliveryOrder group_order_; - SubscribeWindow window_; + Location start_ = Location(0, 0); + Location end_ = Location(kMaxGroupId, kMaxObjectId); + std::optional<uint64_t> relative_groups_; MoqtPriority subscriber_priority_; // The last object received on the stream. std::optional<Location> last_location_;
diff --git a/quiche/quic/moqt/moqt_track_test.cc b/quiche/quic/moqt/moqt_track_test.cc index 618739c..134906f 100644 --- a/quiche/quic/moqt/moqt_track_test.cc +++ b/quiche/quic/moqt/moqt_track_test.cc
@@ -16,6 +16,7 @@ #include "quiche/quic/moqt/moqt_names.h" #include "quiche/quic/moqt/moqt_object.h" #include "quiche/quic/moqt/moqt_priority.h" +#include "quiche/quic/moqt/moqt_types.h" #include "quiche/quic/moqt/test_tools/moqt_mock_visitor.h" #include "quiche/quic/platform/api/quic_test.h" #include "quiche/common/quiche_mem_slice.h" @@ -270,6 +271,38 @@ MoqtObjectStatus::kEndOfTrack, true)); } +TEST_F(UpstreamFetchTest, RelativeJoiningFetch) { + MoqtFetch relative_fetch_message = { + /*request_id=*/2, + JoiningFetchRelative(1, 2), + MessageParameters(), + }; + UpstreamFetch relative_fetch(relative_fetch_message, + FullTrackName("foo", "bar"), + [&](std::unique_ptr<MoqtFetchTask> task) { + fetch_task_ = std::move(task); + }); + relative_fetch.OnFetchResult(Location(10, 50), absl::OkStatus(), nullptr); + EXPECT_FALSE(relative_fetch.InWindow(Location(7, 35))); + EXPECT_TRUE(relative_fetch.InWindow(Location(8, 0))); +} + +TEST_F(UpstreamFetchTest, RelativeJoiningFetchUnderflow) { + MoqtFetch relative_fetch_message = { + /*request_id=*/2, + JoiningFetchRelative(1, 10), + MessageParameters(), + }; + UpstreamFetch relative_fetch(relative_fetch_message, + FullTrackName("foo", "bar"), + [&](std::unique_ptr<MoqtFetchTask> task) { + fetch_task_ = std::move(task); + }); + relative_fetch.OnFetchResult(Location(1, 50), absl::OkStatus(), nullptr); + EXPECT_TRUE(relative_fetch.InWindow(Location(0, 0))); + EXPECT_TRUE(relative_fetch.InWindow(Location(1, 50))); +} + } // namespace test } // namespace moqt