Integrate reordering into the bitrate adjuster algorithm. Also tweak some magic numbers, including the flow control windows. PiperOrigin-RevId: 834798752
diff --git a/quiche/quic/moqt/moqt_bitrate_adjuster.cc b/quiche/quic/moqt/moqt_bitrate_adjuster.cc index 58079c4..3a99e6f 100644 --- a/quiche/quic/moqt/moqt_bitrate_adjuster.cc +++ b/quiche/quic/moqt/moqt_bitrate_adjuster.cc
@@ -4,7 +4,6 @@ #include "quiche/quic/moqt/moqt_bitrate_adjuster.h" -#include <cstdint> #include <cstdlib> #include <optional> @@ -32,31 +31,61 @@ return; } start_time_ = clock_->Now(); + outstanding_objects_.emplace( + /*max_out_of_order_objects=*/parameters_.max_out_of_order_objects); } void MoqtBitrateAdjuster::OnObjectAckReceived( - Location /*location*/, QuicTimeDelta delta_from_deadline) { - if (!start_time_.IsInitialized()) { + Location location, QuicTimeDelta delta_from_deadline) { + if (!start_time_.IsInitialized() || !outstanding_objects_.has_value()) { return; } - const QuicTime earliest_action_time = start_time_ + parameters_.initial_delay; - if (clock_->Now() < earliest_action_time) { + // Update the state. + int reordering_delta = outstanding_objects_->OnObjectAcked(location); + + // Decide whether to act based on the latest signal. + if (!ShouldUseAckAsActionSignal(location)) { return; } - - if (delta_from_deadline < QuicTimeDelta::Zero()) { - // While adjusting down upon the first sign of packets getting late might - // seem aggressive, note that: - // - By the time user occurs, this is already a user-visible issue (so, in - // some sense, this isn't aggressive enough). - // - The adjustment won't happen if we're already bellow `k * max_bw`, so - // if the delays are due to other factors like bufferbloat, the measured - // bandwidth will likely not result in a downwards adjustment. + if (ShouldAttemptAdjustingDown(reordering_delta, delta_from_deadline)) { AttemptAdjustingDown(); } } +bool MoqtBitrateAdjuster::ShouldUseAckAsActionSignal(Location location) { + // Allow for some time to pass for the connection to reach the point at which + // the rate adaptation signals can become useful. + const QuicTime earliest_action_time = start_time_ + parameters_.initial_delay; + const bool too_early_in_the_connection = clock_->Now() < earliest_action_time; + + // Ignore out-of-order acks for the purpose of deciding whether to adjust up + // or down. Generally, if an ack is out of order, the bitrate adjuster has + // already reacted to the later object appropriately. + const bool is_out_of_order_ack = location < last_acked_object_; + last_acked_object_ = location; + + return !too_early_in_the_connection && !is_out_of_order_ack; +} + +bool MoqtBitrateAdjuster::ShouldAttemptAdjustingDown( + int reordering_delta, quic::QuicTimeDelta delta_from_deadline) const { + const bool has_exceeded_max_out_of_order = + reordering_delta > parameters_.max_out_of_order_objects; + QUICHE_DLOG_IF(INFO, has_exceeded_max_out_of_order) + << "Adjusting connection down due to reordering, delta: " + << reordering_delta; + + const bool time_delta_too_close = + delta_from_deadline < parameters_.adjust_down_threshold * time_window_; + QUICHE_DLOG_IF(INFO, time_delta_too_close) + << "Adjusting connection down due to object arriving too late, time " + "delta: " + << delta_from_deadline; + + return has_exceeded_max_out_of_order || time_delta_too_close; +} + void MoqtBitrateAdjuster::AttemptAdjustingDown() { webtransport::SessionStats stats = session_->GetSessionStats(); QuicBandwidth target_bandwidth = @@ -109,4 +138,11 @@ trace_recorder_.RecordTargetBitrateSet(bitrate); } +void MoqtBitrateAdjuster::OnNewObjectEnqueued(Location location) { + if (!start_time_.IsInitialized() || !outstanding_objects_.has_value()) { + return; + } + outstanding_objects_->OnObjectAdded(location); +} + } // namespace moqt
diff --git a/quiche/quic/moqt/moqt_bitrate_adjuster.h b/quiche/quic/moqt/moqt_bitrate_adjuster.h index 050e696..917bbcf 100644 --- a/quiche/quic/moqt/moqt_bitrate_adjuster.h +++ b/quiche/quic/moqt/moqt_bitrate_adjuster.h
@@ -5,13 +5,13 @@ #ifndef QUICHE_QUIC_MOQT_MOQT_BITRATE_ADJUSTER_H_ #define QUICHE_QUIC_MOQT_MOQT_BITRATE_ADJUSTER_H_ -#include <cstdint> #include <optional> #include "quiche/quic/core/quic_bandwidth.h" #include "quiche/quic/core/quic_clock.h" #include "quiche/quic/core/quic_time.h" #include "quiche/quic/moqt/moqt_messages.h" +#include "quiche/quic/moqt/moqt_outstanding_objects.h" #include "quiche/quic/moqt/moqt_session.h" #include "quiche/quic/moqt/moqt_trace_recorder.h" #include "quiche/web_transport/web_transport.h" @@ -55,11 +55,22 @@ // When bitrate is adjusted down, multiply the congestion controller estimate // by this factor. This should be less than 1, since congestion controller // estimate tends to be overly optimistic in practice. - float target_bitrate_multiplier_down = 0.95f; + float target_bitrate_multiplier_down = 0.9f; // Do not perform any updates within `initial_delay` after the connection // start. quic::QuicTimeDelta initial_delay = quic::QuicTimeDelta::FromSeconds(2); + + // If the object arrives too close to the deadline, the bitrate will be + // adjusted down. The threshold is expressed as a fraction of `time_window` + // (which typically would be equal to the size of the buffer in seconds). + float adjust_down_threshold = 0.1f; + + // The maximum gap between the next object expected to be received, and the + // actually received object, expressed as a number of objects. + // + // The default is 12, which corresponds to about 400ms for 30fps video. + int max_out_of_order_objects = 12; }; // MoqtBitrateAdjuster monitors the progress of delivery for a single track, and @@ -74,14 +85,24 @@ // MoqtPublishingMonitorInterface implementation. void OnObjectAckSupportKnown( std::optional<quic::QuicTimeDelta> time_window) override; + void OnNewObjectEnqueued(Location location) override; void OnObjectAckReceived(Location location, quic::QuicTimeDelta delta_from_deadline) override; MoqtTraceRecorder& trace_recorder() { return trace_recorder_; } + MoqtBitrateAdjusterParameters& parameters() { return parameters_; } private: void Start(); + // Checks if the bitrate adjuster should react to an individual ack. + bool ShouldUseAckAsActionSignal(Location location); + + // Checks if the bitrate should be adjusted down based on the result of + // processing an object ACK. + bool ShouldAttemptAdjustingDown( + int reordering_delta, quic::QuicTimeDelta delta_from_deadline) const; + // Attempts adjusting the bitrate down. void AttemptAdjustingDown(); @@ -93,8 +114,16 @@ BitrateAdjustable* adjustable_; // Not owned. MoqtTraceRecorder trace_recorder_; MoqtBitrateAdjusterParameters parameters_; + + // The time at which Start() has been called. quic::QuicTime start_time_ = quic::QuicTime::Zero(); + + // The window size received from the peer. This amount is used to establish + // the scale for incoming time deltas in the object ACKs. quic::QuicTimeDelta time_window_ = quic::QuicTimeDelta::Zero(); + + std::optional<MoqtOutstandingObjects> outstanding_objects_; + Location last_acked_object_; }; // Given a suggestion to change bitrate `old_bitrate` to `new_bitrate` with the
diff --git a/quiche/quic/moqt/moqt_bitrate_adjuster_test.cc b/quiche/quic/moqt/moqt_bitrate_adjuster_test.cc index 95be2b2..aed88ce 100644 --- a/quiche/quic/moqt/moqt_bitrate_adjuster_test.cc +++ b/quiche/quic/moqt/moqt_bitrate_adjuster_test.cc
@@ -67,16 +67,23 @@ MoqtBitrateAdjuster adjuster_; }; +TEST_F(MoqtBitrateAdjusterTest, IgnoreCallsBeforeStart) { + MoqtBitrateAdjuster uninitialized_adjuster(&clock_, &session_, &adjustable_); + uninitialized_adjuster.OnNewObjectEnqueued(Location(1, 0)); + uninitialized_adjuster.OnObjectAckReceived( + Location(1, 0), QuicTimeDelta::FromMilliseconds(100)); +} + TEST_F(MoqtBitrateAdjusterTest, SteadyState) { // The fact that estimated bitrate is 1bps should not matter, since we never // have a reason to adjust down. stats_.estimated_send_rate_bps = 1; - EXPECT_CALL(adjustable_, OnBitrateAdjusted(_)).Times(0); + EXPECT_CALL(adjustable_, OnBitrateAdjusted).Times(0); for (int i = 0; i < 250; ++i) { clock_.AdvanceTime(kDefaultRtt); for (int j = 0; j < 10; ++j) { - adjuster_.OnObjectAckReceived(Location(i, j), kDefaultRtt * 2); + adjuster_.OnObjectAckReceived(Location(i, j), kDefaultTimeScale * 0.9); } } } @@ -117,6 +124,38 @@ EXPECT_EQ(adjusted_times, 2); } +TEST_F(MoqtBitrateAdjusterTest, OutOfOrderAckIgnored) { + int adjusted_times = 0; + EXPECT_CALL(adjustable_, OnBitrateAdjusted).WillRepeatedly([&] { + ++adjusted_times; + }); + + clock_.AdvanceTime(100 * kDefaultRtt); + stats_.estimated_send_rate_bps = (0.5 * kDefaultBitrate).ToBitsPerSecond(); + adjuster_.OnObjectAckReceived(Location(0, 1), + QuicTimeDelta::FromMilliseconds(-1)); + EXPECT_EQ(adjusted_times, 1); + + clock_.AdvanceTime(100 * kDefaultRtt); + stats_.estimated_send_rate_bps = (0.25 * kDefaultBitrate).ToBitsPerSecond(); + adjuster_.OnObjectAckReceived(Location(0, 0), + QuicTimeDelta::FromMilliseconds(-1)); + EXPECT_EQ(adjusted_times, 1); +} + +TEST_F(MoqtBitrateAdjusterTest, Reordering) { + adjuster_.parameters().max_out_of_order_objects = 1; + clock_.AdvanceTime(100 * kDefaultRtt); + stats_.estimated_send_rate_bps = (0.5 * kDefaultBitrate).ToBitsPerSecond(); + + adjuster_.OnNewObjectEnqueued(Location(0, 0)); + adjuster_.OnNewObjectEnqueued(Location(0, 1)); + adjuster_.OnNewObjectEnqueued(Location(0, 2)); + + EXPECT_CALL(adjustable_, OnBitrateAdjusted); + adjuster_.OnObjectAckReceived(Location(0, 2), kDefaultTimeScale); +} + TEST_F(MoqtBitrateAdjusterTest, ShouldIgnoreBitrateAdjustment) { constexpr quic::QuicBandwidth kOldBandwith = quic::QuicBandwidth::FromKBitsPerSecond(1024);
diff --git a/quiche/quic/moqt/moqt_integration_test.cc b/quiche/quic/moqt/moqt_integration_test.cc index 046eab9..a887b43 100644 --- a/quiche/quic/moqt/moqt_integration_test.cc +++ b/quiche/quic/moqt/moqt_integration_test.cc
@@ -714,10 +714,12 @@ MoqtKnownTrackPublisher publisher; server_->session()->set_publisher(&publisher); - auto track_publisher = std::make_shared<MockTrackPublisher>(full_track_name); + auto track_publisher = std::make_shared<MoqtOutgoingQueue>( + full_track_name, MoqtForwardingPreference::kSubgroup, + test_harness_.simulator().GetClock()); publisher.Add(track_publisher); - MockPublishingMonitorInterface monitoring; + testing::StrictMock<MockPublishingMonitorInterface> monitoring; server_->session()->SetMonitoringInterfaceForTrack(full_track_name, &monitoring); @@ -726,10 +728,6 @@ .WillOnce([&](MoqtObjectAckFunction new_ack_function) { ack_function = std::move(new_ack_function); }); - EXPECT_CALL(*track_publisher, AddObjectListener) - .WillOnce([&](MoqtObjectListener* listener) { - listener->OnSubscribeAccepted(); - }); EXPECT_CALL(subscribe_visitor_, OnReply) .WillOnce([&](const FullTrackName&, std::variant<SubscribeOkData, MoqtRequestError>) { @@ -739,10 +737,6 @@ VersionSpecificParameters parameters; parameters.oack_window_size = quic::QuicTimeDelta::FromMilliseconds(100); - ON_CALL(*track_publisher, expiration) - .WillByDefault(Return(quic::QuicTimeDelta::Zero())); - ON_CALL(*track_publisher, delivery_order) - .WillByDefault(Return(MoqtDeliveryOrder::kAscending)); client_->session()->SubscribeCurrentObject(full_track_name, &subscribe_visitor_, parameters); EXPECT_CALL(monitoring, OnObjectAckSupportKnown(parameters.oack_window_size)); @@ -757,6 +751,9 @@ bool success = test_harness_.RunUntilWithDefaultTimeout([&] { return done; }); EXPECT_TRUE(success); + EXPECT_CALL(monitoring, OnNewObjectEnqueued(Location(0, 0))); + track_publisher->AddObject(QuicheMemSlice::Copy("test"), true); + const quic_trace::Trace& trace = *server_->trace_visitor()->trace(); std::vector<int64_t> ack_deltas; for (const quic_trace::Event& event : trace.events()) {
diff --git a/quiche/quic/moqt/moqt_quic_config.cc b/quiche/quic/moqt/moqt_quic_config.cc index 415eba4..8cefd5f 100644 --- a/quiche/quic/moqt/moqt_quic_config.cc +++ b/quiche/quic/moqt/moqt_quic_config.cc
@@ -22,7 +22,7 @@ // A typical I-frame in a high-bitrate FHD video tends to be in the low 100 KiB // range. Even for a higher-latency connection such as 100ms, that would imply // an instantaneous bitrate of 10 Mbps. -constexpr QuicByteCount kDefaultInitialStreamWindow = 128 * 1024; +constexpr QuicByteCount kDefaultInitialStreamWindow = 512 * 1024; // The flow control window autotuning does work with connection-level flow // control, but we still can make the startup smoother by setting a more @@ -30,7 +30,7 @@ // much more than a single data stream at a time, since for most of the MOQT // users the bandwidth usage would be dominated by a single track. constexpr QuicByteCount kDefaultInitialConnectionWindow = - 2 * kDefaultInitialStreamWindow; + 3 * kDefaultInitialStreamWindow; } // namespace
diff --git a/quiche/quic/moqt/moqt_session.cc b/quiche/quic/moqt/moqt_session.cc index 6b602c0..7f8abeb 100644 --- a/quiche/quic/moqt/moqt_session.cc +++ b/quiche/quic/moqt/moqt_session.cc
@@ -2093,8 +2093,26 @@ if (!InWindow(location)) { return; } + + if (monitoring_interface_ != nullptr) { + // Notify the monitoring interface about all newly published normal objects. + // Objects with other statuses are not guaranteed to be acknowledged, thus + // passing them into the monitoring interface can lead to confusion. + std::optional<PublishedObject> object = track_publisher_->GetCachedObject( + location.group, subgroup, location.object); + QUICHE_DCHECK(object.has_value()) + << "Object " << absl::StrCat(location) << " on track " + << track_publisher_->GetTrackName().ToString() + << " does not exist, despite OnNewObjectAvailable being called"; + if (object.has_value() && object->metadata.location == location && + object->metadata.status == MoqtObjectStatus::kNormal) { + monitoring_interface_->OnNewObjectEnqueued(location); + } + } + session_->trace_recorder_.RecordNewObjectAvaliable( track_alias_, *track_publisher_, location, subgroup, publisher_priority); + DataStreamIndex index(location.group, subgroup); if (reset_subgroups_.contains(index)) { // This subgroup has already been reset, ignore.
diff --git a/quiche/quic/moqt/moqt_session.h b/quiche/quic/moqt/moqt_session.h index 90e8bc2..859123a 100644 --- a/quiche/quic/moqt/moqt_session.h +++ b/quiche/quic/moqt/moqt_session.h
@@ -66,6 +66,7 @@ virtual void OnObjectAckSupportKnown( std::optional<quic::QuicTimeDelta> time_window) = 0; + virtual void OnNewObjectEnqueued(Location location) = 0; virtual void OnObjectAckReceived(Location location, quic::QuicTimeDelta delta_from_deadline) = 0; };
diff --git a/quiche/quic/moqt/test_tools/moqt_mock_visitor.h b/quiche/quic/moqt/test_tools/moqt_mock_visitor.h index 433f4a9..b34f7e4 100644 --- a/quiche/quic/moqt/test_tools/moqt_mock_visitor.h +++ b/quiche/quic/moqt/test_tools/moqt_mock_visitor.h
@@ -211,6 +211,7 @@ public: MOCK_METHOD(void, OnObjectAckSupportKnown, (std::optional<quic::QuicTimeDelta> time_window), (override)); + MOCK_METHOD(void, OnNewObjectEnqueued, (Location location), (override)); MOCK_METHOD(void, OnObjectAckReceived, (Location location, quic::QuicTimeDelta delta_from_deadline), (override));
diff --git a/quiche/quic/moqt/test_tools/moqt_simulator_test.cc b/quiche/quic/moqt/test_tools/moqt_simulator_test.cc index f09e973..0d37eab 100644 --- a/quiche/quic/moqt/test_tools/moqt_simulator_test.cc +++ b/quiche/quic/moqt/test_tools/moqt_simulator_test.cc
@@ -61,14 +61,10 @@ MoqtSimulator simulator(parameters); simulator.Run(); EXPECT_GE(simulator.received_on_time_fraction(), 0.8f); - EXPECT_LT(simulator.received_on_time_fraction(), 0.99f); - // TODO(vasilvv): re-enable once rate adaptation works well enough so that - // this is no longer flaky. -#if 0 + EXPECT_LT(simulator.received_on_time_fraction(), 0.999f); EXPECT_GT(CountEventType(simulator.client_trace(), EventType::MOQT_TARGET_BITRATE_SET), 0); -#endif quic::QuicConnectionStats stats = simulator.client_quic_session()->connection()->GetStats();