Implement bandwidth probing in MoQ bitrate adjuster. This CL introduces various level of "connection quality" for MoQT connection, and adjust bitrate and bandwidth probes based on the quality level. PiperOrigin-RevId: 855310781
diff --git a/quiche/quic/moqt/moqt_bitrate_adjuster.cc b/quiche/quic/moqt/moqt_bitrate_adjuster.cc index 3a99e6f..37990cf 100644 --- a/quiche/quic/moqt/moqt_bitrate_adjuster.cc +++ b/quiche/quic/moqt/moqt_bitrate_adjuster.cc
@@ -4,12 +4,21 @@ #include "quiche/quic/moqt/moqt_bitrate_adjuster.h" +#include <algorithm> #include <cstdlib> +#include <memory> #include <optional> +#include <string> +#include <utility> +#include "absl/functional/bind_front.h" +#include "absl/time/time.h" +#include "quiche/quic/core/quic_alarm_factory.h" #include "quiche/quic/core/quic_bandwidth.h" #include "quiche/quic/core/quic_time.h" +#include "quiche/quic/core/quic_types.h" #include "quiche/quic/moqt/moqt_messages.h" +#include "quiche/quic/moqt/moqt_probe_manager.h" #include "quiche/common/platform/api/quiche_bug_tracker.h" #include "quiche/common/platform/api/quiche_logging.h" #include "quiche/web_transport/web_transport.h" @@ -19,20 +28,41 @@ namespace { using ::quic::QuicBandwidth; +using ::quic::QuicByteCount; using ::quic::QuicTime; using ::quic::QuicTimeDelta; } // namespace +MoqtBitrateAdjuster::MoqtBitrateAdjuster( + const quic::QuicClock* clock, webtransport::Session* session, + std::unique_ptr<MoqtProbeManagerInterface> probe_manager, + BitrateAdjustable* adjustable) + : clock_(clock), + session_(session), + adjustable_(adjustable), + probe_manager_(std::move(probe_manager)) {} + +MoqtBitrateAdjuster::MoqtBitrateAdjuster(const quic::QuicClock* clock, + webtransport::Session* session, + quic::QuicAlarmFactory* alarm_factory, + BitrateAdjustable* adjustable) + : MoqtBitrateAdjuster(clock, session, + std::make_unique<MoqtProbeManager>( + session, clock, *alarm_factory, &trace_recorder_), + adjustable) {} + void MoqtBitrateAdjuster::Start() { if (start_time_.IsInitialized()) { QUICHE_BUG(MoqtBitrateAdjuster_double_init) << "MoqtBitrateAdjuster::Start() called more than once."; return; } + start_time_ = clock_->Now(); outstanding_objects_.emplace( - /*max_out_of_order_objects=*/parameters_.max_out_of_order_objects); + /*max_out_of_order_objects=*/parameters_ + .quality_level_reordering_thresholds[0]); } void MoqtBitrateAdjuster::OnObjectAckReceived( @@ -40,6 +70,7 @@ if (!start_time_.IsInitialized() || !outstanding_objects_.has_value()) { return; } + const QuicTime now = clock_->Now(); // Update the state. int reordering_delta = outstanding_objects_->OnObjectAcked(location); @@ -48,9 +79,25 @@ if (!ShouldUseAckAsActionSignal(location)) { return; } - if (ShouldAttemptAdjustingDown(reordering_delta, delta_from_deadline)) { + const ConnectionQualityLevel quality_level = + GetQualityLevel(reordering_delta, delta_from_deadline); + + if (quality_level == ConnectionQualityLevel::kStable) { + if (!in_stable_state_since_.has_value()) { + in_stable_state_since_ = now; + } + } else { + in_stable_state_since_.reset(); + } + + if (quality_level <= ConnectionQualityLevel::kVeryPrecarious) { + probe_manager_->StopProbe(); + } + + if (quality_level <= ConnectionQualityLevel::kCritical) { AttemptAdjustingDown(); } + MaybeStartProbing(now); } bool MoqtBitrateAdjuster::ShouldUseAckAsActionSignal(Location location) { @@ -68,22 +115,34 @@ return !too_early_in_the_connection && !is_out_of_order_ack; } -bool MoqtBitrateAdjuster::ShouldAttemptAdjustingDown( +ConnectionQualityLevel MoqtBitrateAdjuster::GetQualityLevel( int reordering_delta, quic::QuicTimeDelta delta_from_deadline) const { - const bool has_exceeded_max_out_of_order = - reordering_delta > parameters_.max_out_of_order_objects; - QUICHE_DLOG_IF(INFO, has_exceeded_max_out_of_order) - << "Adjusting connection down due to reordering, delta: " - << reordering_delta; + for (int i = 0; i < kNumQualityLevels - 1; ++i) { + const ConnectionQualityLevel level = static_cast<ConnectionQualityLevel>(i); - const bool time_delta_too_close = - delta_from_deadline < parameters_.adjust_down_threshold * time_window_; - QUICHE_DLOG_IF(INFO, time_delta_too_close) - << "Adjusting connection down due to object arriving too late, time " - "delta: " - << delta_from_deadline; + const int max_out_of_order_objects = + parameters_.quality_level_reordering_thresholds[i]; + const bool has_exceeded_max_out_of_order = + reordering_delta > max_out_of_order_objects; + if (has_exceeded_max_out_of_order) { + QUICHE_DLOG(INFO) << "Downgrading connection quality down to " << level + << " due to reordering, delta: " << reordering_delta; + return level; + } - return has_exceeded_max_out_of_order || time_delta_too_close; + const float time_delta_threshold = + parameters_.quality_level_time_thresholds[i]; + const bool time_delta_too_close = + delta_from_deadline < time_delta_threshold * time_window_; + if (time_delta_too_close) { + QUICHE_DLOG(INFO) << "Downgrading connection quality down to " << level + << "due to object arriving too late, time delta: " + << delta_from_deadline; + return level; + } + } + + return ConnectionQualityLevel::kStable; } void MoqtBitrateAdjuster::AttemptAdjustingDown() { @@ -145,4 +204,104 @@ outstanding_objects_->OnObjectAdded(location); } +void MoqtBitrateAdjuster::MaybeStartProbing(QuicTime now) { + if (!in_stable_state_since_.has_value() || probe_manager_->HasActiveProbe() || + !adjustable_->CouldUseExtraBandwidth()) { + return; + } + + const QuicTime start_probe_after = + *in_stable_state_since_ + parameters_.time_before_probing; + if (now < start_probe_after) { + return; + } + + const webtransport::SessionStats stats = session_->GetSessionStats(); + const QuicBandwidth current_bandwidth = + std::min(adjustable_->GetCurrentBitrate(), + QuicBandwidth::FromBitsPerSecond(stats.estimated_send_rate_bps)); + const QuicBandwidth target_bandwidth = + parameters_.probe_increase_target * current_bandwidth; + // Approximation of PTO as defined in RFC 9002, Section 6.2.1. + const QuicTimeDelta pto(stats.smoothed_rtt + 4 * stats.rtt_variation); + const QuicByteCount probe_size = + parameters_.round_trips_for_probe * pto * target_bandwidth; + const QuicTimeDelta probe_timeout = + current_bandwidth.TransferTime(probe_size) + pto; + std::optional<ProbeId> probe_id = probe_manager_->StartProbe( + probe_size, probe_timeout, + absl::bind_front(&MoqtBitrateAdjuster::OnProbeResult, this, stats)); + + if (!probe_id.has_value()) { + QUICHE_DLOG(WARNING) + << "Failed to create a new probe. Most likely blocked by flow control."; + in_stable_state_since_.reset(); + } +} + +void MoqtBitrateAdjuster::OnProbeResult( + const webtransport::SessionStats& old_stats, const ProbeResult& result) { + // Clear the timer before the next probe regardless of what we do with the + // results of this one. + in_stable_state_since_.reset(); + + // It is possible for the probe to be cancelled due to poor connection status + // (among other reasons). In that case, we shouldn't attempt to increase the + // bitrate. + if (result.status == ProbeStatus::kAborted) { + return; + } + + // While the probe can also be timed out, it is still possible to get a useful + // bandwidth increase from a timed-out probe. Check for a probe duration + // threshold instead. + const webtransport::SessionStats stats = session_->GetSessionStats(); + const double probe_duration_in_rtts = + result.time_elapsed.ToMicroseconds() / + absl::ToDoubleMicroseconds(stats.smoothed_rtt); + if (probe_duration_in_rtts < parameters_.min_probe_duration_in_rtts) { + return; + } + + // Use the bandwidth estimate from the congestion controller as the main + // input. + const QuicBandwidth congestion_control_bandwidth = + QuicBandwidth::FromBitsPerSecond(stats.estimated_send_rate_bps); + + // Use the long-term average over the duration of entire probe as the input. + const QuicByteCount acked_bytes = stats.application_bytes_acknowledged - + old_stats.application_bytes_acknowledged; + const QuicBandwidth average_goodput_over_probe_duration = + QuicBandwidth::FromBytesAndTimeDelta(acked_bytes, result.time_elapsed); + // If the probe has lasted N RTTs, assume that it could have accidentally + // included an extra RTT worth of data due to ack aggregation. + const double average_goodput_adjustment = + probe_duration_in_rtts / (probe_duration_in_rtts + 1); + + // Use congestion controller bandwidth with the long-term average as a limit. + const QuicBandwidth target_bandwidth = std::min( + congestion_control_bandwidth * parameters_.target_bitrate_multiplier_up, + average_goodput_adjustment * average_goodput_over_probe_duration); + + QUICHE_DLOG(INFO) << "Adjusting the bitrate up to " << target_bandwidth; + SuggestNewBitrate(target_bandwidth, BitrateAdjustmentType::kUp); +} + +std::string ConnectionQualityLevelToString(ConnectionQualityLevel level) { + switch (level) { + case ConnectionQualityLevel::kCritical: + return "Critical"; + case ConnectionQualityLevel::kVeryPrecarious: + return "VeryPrecarious"; + case ConnectionQualityLevel::kPrecarious: + return "Precarious"; + case ConnectionQualityLevel::kStable: + return "Stable"; + + case ConnectionQualityLevel::kNumLevels: + break; + } + return "[unknown]"; +} + } // namespace moqt
diff --git a/quiche/quic/moqt/moqt_bitrate_adjuster.h b/quiche/quic/moqt/moqt_bitrate_adjuster.h index 917bbcf..1a3aa89 100644 --- a/quiche/quic/moqt/moqt_bitrate_adjuster.h +++ b/quiche/quic/moqt/moqt_bitrate_adjuster.h
@@ -5,13 +5,17 @@ #ifndef QUICHE_QUIC_MOQT_MOQT_BITRATE_ADJUSTER_H_ #define QUICHE_QUIC_MOQT_MOQT_BITRATE_ADJUSTER_H_ +#include <memory> #include <optional> +#include <string> +#include "quiche/quic/core/quic_alarm_factory.h" #include "quiche/quic/core/quic_bandwidth.h" #include "quiche/quic/core/quic_clock.h" #include "quiche/quic/core/quic_time.h" #include "quiche/quic/moqt/moqt_messages.h" #include "quiche/quic/moqt/moqt_outstanding_objects.h" +#include "quiche/quic/moqt/moqt_probe_manager.h" #include "quiche/quic/moqt/moqt_session.h" #include "quiche/quic/moqt/moqt_trace_recorder.h" #include "quiche/web_transport/web_transport.h" @@ -49,6 +53,35 @@ BitrateAdjustmentType type) = 0; }; +// Connection quality levels. Used by the bitrate adjuster to decide when to +// probe up or down. +enum class ConnectionQualityLevel { + // Lowest quality level. Reaching it will result in the bitrate adjuster + // attempting to immediately lower the bitrate. + kCritical, + // At this level, no new bandwidth probes will be started and all existing + // bandwidth probes are cancelled. + kVeryPrecarious, + // At this level, the adjuster will not start any new probes, but all of the + // previous probes will be allowed to continue. This level is separate from + // the previous one to avoid unstable behavior around the boundaries. + kPrecarious, + // Highest quality level. The adjuster will attempt to probe bandwidth if + // necessary. + kStable, + + kNumLevels, +}; + +std::string ConnectionQualityLevelToString(ConnectionQualityLevel level); +template <typename Sink> +void AbslStringify(Sink& sink, ConnectionQualityLevel level) { + sink.Append(ConnectionQualityLevelToString(level)); +} + +inline constexpr int kNumQualityLevels = + static_cast<int>(ConnectionQualityLevel::kNumLevels); + // Parameters (mostly magic numbers) that determine behavior of // MoqtBitrateAdjuster. struct MoqtBitrateAdjusterParameters { @@ -57,20 +90,47 @@ // estimate tends to be overly optimistic in practice. float target_bitrate_multiplier_down = 0.9f; + // Same, but applies for adjusting up. Similar considerations for selecting + // the value apply. + float target_bitrate_multiplier_up = 0.9f; + // Do not perform any updates within `initial_delay` after the connection // start. quic::QuicTimeDelta initial_delay = quic::QuicTimeDelta::FromSeconds(2); - // If the object arrives too close to the deadline, the bitrate will be - // adjusted down. The threshold is expressed as a fraction of `time_window` - // (which typically would be equal to the size of the buffer in seconds). - float adjust_down_threshold = 0.1f; + // Quality level thresholds. If the object arrives with the + // time-before-deadline that is lower than the first number listed here, it + // will result in the connection being assigned the lowest quality level, and + // so on. The thresholds are expressed as a fraction of `time_window` (which + // typically would be equal to the size of the buffer in seconds). + float quality_level_time_thresholds[kNumQualityLevels - 1] = {0.2f, 0.4f, + 0.6f}; // The maximum gap between the next object expected to be received, and the - // actually received object, expressed as a number of objects. + // actually received object, expressed as a number of objects. If the + // reordering gap exceeds the configured threshold, the connection is marked + // as being in that quality level. The thresholds correspond to kCritical, + // kVeryPrecarious, and kPrecarious. // - // The default is 12, which corresponds to about 400ms for 30fps video. - int max_out_of_order_objects = 12; + // The default for the worst-case reordering is 12, which corresponds to about + // 400ms for 30fps video. + int quality_level_reordering_thresholds[kNumQualityLevels - 1] = {12, 6, 3}; + + // Amount of time the connection has to spend in good state before attempting + // to probe for bandwidth. + quic::QuicTimeDelta time_before_probing = quic::QuicTimeDelta::FromSeconds(2); + + // When probing, attempt to increase the bandwidth by the specified factor. + // Used when determining the probe size and timeout. + float probe_increase_target = 1.2; + + // Expected duration of a probe, expressed in the number of round-trips + // necessary. Selecting values below 8 might interact negatively with BBR. + float round_trips_for_probe = 16; + + // Probe results will be ignored if the probe was cancelled before lasting for + // the specified duration. + float min_probe_duration_in_rtts = 4; }; // MoqtBitrateAdjuster monitors the progress of delivery for a single track, and @@ -79,8 +139,12 @@ public: MoqtBitrateAdjuster(const quic::QuicClock* clock, webtransport::Session* session, - BitrateAdjustable* adjustable) - : clock_(clock), session_(session), adjustable_(adjustable) {} + quic::QuicAlarmFactory* alarm_factory, + BitrateAdjustable* adjustable); + MoqtBitrateAdjuster(const quic::QuicClock* clock, + webtransport::Session* session, + std::unique_ptr<MoqtProbeManagerInterface> probe_manager, + BitrateAdjustable* adjustable); // MoqtPublishingMonitorInterface implementation. void OnObjectAckSupportKnown( @@ -100,9 +164,15 @@ // Checks if the bitrate should be adjusted down based on the result of // processing an object ACK. - bool ShouldAttemptAdjustingDown( + ConnectionQualityLevel GetQualityLevel( int reordering_delta, quic::QuicTimeDelta delta_from_deadline) const; + // Starts a bandwidth probe if all the conditions are met. + void MaybeStartProbing(quic::QuicTime now); + // Callback called whenever the bandwidth probe is finished. + void OnProbeResult(const webtransport::SessionStats& old_stats, + const ProbeResult& result); + // Attempts adjusting the bitrate down. void AttemptAdjustingDown(); @@ -122,7 +192,12 @@ // the scale for incoming time deltas in the object ACKs. quic::QuicTimeDelta time_window_ = quic::QuicTimeDelta::Zero(); + // The earliest point at which the connection has been continuously in the + // stable state. Used to time bandwidth probes. + std::optional<quic::QuicTime> in_stable_state_since_; + std::optional<MoqtOutstandingObjects> outstanding_objects_; + std::unique_ptr<MoqtProbeManagerInterface> probe_manager_; Location last_acked_object_; };
diff --git a/quiche/quic/moqt/moqt_bitrate_adjuster_test.cc b/quiche/quic/moqt/moqt_bitrate_adjuster_test.cc index aed88ce..ee5a747 100644 --- a/quiche/quic/moqt/moqt_bitrate_adjuster_test.cc +++ b/quiche/quic/moqt/moqt_bitrate_adjuster_test.cc
@@ -4,9 +4,16 @@ #include "quiche/quic/moqt/moqt_bitrate_adjuster.h" +#include <memory> +#include <optional> +#include <utility> + +#include "absl/time/time.h" #include "quiche/quic/core/quic_bandwidth.h" #include "quiche/quic/core/quic_time.h" +#include "quiche/quic/core/quic_types.h" #include "quiche/quic/moqt/moqt_messages.h" +#include "quiche/quic/moqt/moqt_probe_manager.h" #include "quiche/quic/test_tools/mock_clock.h" #include "quiche/common/platform/api/quiche_test.h" #include "quiche/web_transport/test_tools/mock_web_transport.h" @@ -18,6 +25,7 @@ using ::quic::QuicBandwidth; using ::quic::QuicTimeDelta; using ::testing::_; +using ::testing::Return; // Simple adjustable object that just keeps track of whatever value has been // assigned to it, and has a mock method to notify of it changing. @@ -40,8 +48,18 @@ QuicBandwidth bitrate_; }; +class MockProbeManager : public MoqtProbeManagerInterface { + public: + MOCK_METHOD(std::optional<ProbeId>, StartProbe, + (quic::QuicByteCount probe_size, quic::QuicTimeDelta timeout, + Callback callback), + (override)); + MOCK_METHOD(std::optional<ProbeId>, StopProbe, (), (override)); + MOCK_METHOD(bool, HasActiveProbe, (), (const, override)); +}; + constexpr QuicBandwidth kDefaultBitrate = - QuicBandwidth::FromBitsPerSecond(2000); + QuicBandwidth::FromKBitsPerSecond(2000); constexpr QuicTimeDelta kDefaultRtt = QuicTimeDelta::FromMilliseconds(20); constexpr QuicTimeDelta kDefaultTimeScale = QuicTimeDelta::FromSeconds(1); @@ -49,7 +67,7 @@ protected: MoqtBitrateAdjusterTest() : adjustable_(kDefaultBitrate), - adjuster_(&clock_, &session_, &adjustable_) { + adjuster_(&clock_, &session_, CreateProbeManager(), &adjustable_) { stats_.min_rtt = stats_.smoothed_rtt = kDefaultRtt.ToAbsl(); stats_.estimated_send_rate_bps = (1.2 * kDefaultBitrate).ToBitsPerSecond(); ON_CALL(session_, GetSessionStats()).WillByDefault([this] { @@ -60,15 +78,24 @@ adjuster_.OnObjectAckSupportKnown(kDefaultTimeScale); } + std::unique_ptr<MoqtProbeManagerInterface> CreateProbeManager() { + auto manager = std::make_unique<MockProbeManager>(); + ON_CALL(*manager, HasActiveProbe).WillByDefault(Return(false)); + probe_manager_ = manager.get(); + return manager; + } + MockBitrateAdjustable adjustable_; webtransport::SessionStats stats_; quic::MockClock clock_; webtransport::test::MockSession session_; + MockProbeManager* probe_manager_ = nullptr; MoqtBitrateAdjuster adjuster_; }; TEST_F(MoqtBitrateAdjusterTest, IgnoreCallsBeforeStart) { - MoqtBitrateAdjuster uninitialized_adjuster(&clock_, &session_, &adjustable_); + MoqtBitrateAdjuster uninitialized_adjuster( + &clock_, &session_, CreateProbeManager(), &adjustable_); uninitialized_adjuster.OnNewObjectEnqueued(Location(1, 0)); uninitialized_adjuster.OnObjectAckReceived( Location(1, 0), QuicTimeDelta::FromMilliseconds(100)); @@ -80,6 +107,7 @@ stats_.estimated_send_rate_bps = 1; EXPECT_CALL(adjustable_, OnBitrateAdjusted).Times(0); + EXPECT_CALL(*probe_manager_, StartProbe).WillRepeatedly(Return(std::nullopt)); for (int i = 0; i < 250; ++i) { clock_.AdvanceTime(kDefaultRtt); for (int j = 0; j < 10; ++j) { @@ -88,6 +116,176 @@ } } +TEST_F(MoqtBitrateAdjusterTest, ProbeUp) { + stats_.min_rtt = kDefaultRtt.ToAbsl(); + stats_.smoothed_rtt = kDefaultRtt.ToAbsl(); + stats_.rtt_variation = absl::ZeroDuration(); + stats_.application_bytes_acknowledged = 0; + stats_.estimated_send_rate_bps = kDefaultBitrate.ToBitsPerSecond(); + + // Drive the connection in the steady state until the probe is activated. + EXPECT_CALL(adjustable_, OnBitrateAdjusted).Times(0); + std::optional<MoqtProbeManagerInterface::Callback> probe_callback; + quic::QuicByteCount requested_probe_size; + EXPECT_CALL(*probe_manager_, StartProbe) + .WillOnce([&](quic::QuicByteCount probe_size, quic::QuicTimeDelta timeout, + MoqtProbeManagerInterface::Callback callback) { + requested_probe_size = probe_size; + probe_callback = std::move(callback); + return 12345; + }); + for (int i = 0; i < 2500; ++i) { + clock_.AdvanceTime(kDefaultRtt); + adjuster_.OnObjectAckReceived(Location(i, 0), kDefaultTimeScale * 0.9); + if (probe_callback.has_value()) { + break; + } + } + ASSERT_TRUE(probe_callback.has_value()); + + const QuicTimeDelta kProbeDuration = 20 * kDefaultRtt; + clock_.AdvanceTime(kProbeDuration); + stats_.application_bytes_acknowledged += (1 << 30); // Arbitrary big number. + stats_.estimated_send_rate_bps = 2 * kDefaultBitrate.ToBitsPerSecond(); + ProbeResult result{.id = 12345, + .status = ProbeStatus::kSuccess, + .probe_size = requested_probe_size, + .time_elapsed = kProbeDuration}; + EXPECT_CALL(adjustable_, OnBitrateAdjusted(kDefaultBitrate * 1.8)); + std::move (*probe_callback)(result); +} + +TEST_F(MoqtBitrateAdjusterTest, ProbeUpNotEnteredInPrecariousState) { + stats_.min_rtt = kDefaultRtt.ToAbsl(); + stats_.smoothed_rtt = kDefaultRtt.ToAbsl(); + stats_.rtt_variation = absl::ZeroDuration(); + stats_.application_bytes_acknowledged = 0; + stats_.estimated_send_rate_bps = kDefaultBitrate.ToBitsPerSecond(); + + EXPECT_CALL(adjustable_, OnBitrateAdjusted).Times(0); + EXPECT_CALL(*probe_manager_, StartProbe).Times(0); + for (int i = 0; i < 2500; ++i) { + clock_.AdvanceTime(kDefaultRtt); + adjuster_.OnObjectAckReceived(Location(i, 0), kDefaultTimeScale * 0.5); + } +} + +TEST_F(MoqtBitrateAdjusterTest, ProbeUpIgnoredDueToBeingTooShort) { + stats_.min_rtt = kDefaultRtt.ToAbsl(); + stats_.smoothed_rtt = kDefaultRtt.ToAbsl(); + stats_.rtt_variation = absl::ZeroDuration(); + stats_.application_bytes_acknowledged = 0; + stats_.estimated_send_rate_bps = kDefaultBitrate.ToBitsPerSecond(); + + // Drive the connection in the steady state until the probe is activated. + EXPECT_CALL(adjustable_, OnBitrateAdjusted).Times(0); + std::optional<MoqtProbeManagerInterface::Callback> probe_callback; + quic::QuicByteCount requested_probe_size; + EXPECT_CALL(*probe_manager_, StartProbe) + .WillOnce([&](quic::QuicByteCount probe_size, quic::QuicTimeDelta timeout, + MoqtProbeManagerInterface::Callback callback) { + requested_probe_size = probe_size; + probe_callback = std::move(callback); + return 12345; + }); + for (int i = 0; i < 2500; ++i) { + clock_.AdvanceTime(kDefaultRtt); + adjuster_.OnObjectAckReceived(Location(i, 0), kDefaultTimeScale * 0.9); + if (probe_callback.has_value()) { + break; + } + } + ASSERT_TRUE(probe_callback.has_value()); + + const QuicTimeDelta kProbeDuration = kDefaultRtt; + clock_.AdvanceTime(kProbeDuration); + stats_.application_bytes_acknowledged += (1 << 30); // Arbitrary big number. + stats_.estimated_send_rate_bps = 2 * kDefaultBitrate.ToBitsPerSecond(); + ProbeResult result{.id = 12345, + .status = ProbeStatus::kSuccess, + .probe_size = requested_probe_size, + .time_elapsed = kProbeDuration}; + std::move (*probe_callback)(result); +} + +TEST_F(MoqtBitrateAdjusterTest, ProbeUpUsesAverage) { + stats_.min_rtt = kDefaultRtt.ToAbsl(); + stats_.smoothed_rtt = kDefaultRtt.ToAbsl(); + stats_.rtt_variation = absl::ZeroDuration(); + stats_.application_bytes_acknowledged = 0; + stats_.estimated_send_rate_bps = kDefaultBitrate.ToBitsPerSecond(); + + // Drive the connection in the steady state until the probe is activated. + EXPECT_CALL(adjustable_, OnBitrateAdjusted).Times(0); + std::optional<MoqtProbeManagerInterface::Callback> probe_callback; + quic::QuicByteCount requested_probe_size; + EXPECT_CALL(*probe_manager_, StartProbe) + .WillOnce([&](quic::QuicByteCount probe_size, quic::QuicTimeDelta timeout, + MoqtProbeManagerInterface::Callback callback) { + requested_probe_size = probe_size; + probe_callback = std::move(callback); + return 12345; + }); + for (int i = 0; i < 2500; ++i) { + clock_.AdvanceTime(kDefaultRtt); + adjuster_.OnObjectAckReceived(Location(i, 0), kDefaultTimeScale * 0.9); + if (probe_callback.has_value()) { + break; + } + } + ASSERT_TRUE(probe_callback.has_value()); + + const QuicTimeDelta kProbeDuration = 19 * kDefaultRtt; + const QuicBandwidth kNewBandwidth = 2 * kDefaultBitrate; + clock_.AdvanceTime(kProbeDuration); + stats_.application_bytes_acknowledged += kNewBandwidth * kProbeDuration; + stats_.estimated_send_rate_bps = (100 * kDefaultBitrate).ToBitsPerSecond(); + ProbeResult result{.id = 12345, + .status = ProbeStatus::kSuccess, + .probe_size = requested_probe_size, + .time_elapsed = kProbeDuration}; + EXPECT_CALL(adjustable_, OnBitrateAdjusted(kNewBandwidth * (19.0 / 20.0))); + std::move (*probe_callback)(result); +} + +TEST_F(MoqtBitrateAdjusterTest, ProbeUpCancelInBadState) { + stats_.min_rtt = kDefaultRtt.ToAbsl(); + stats_.smoothed_rtt = kDefaultRtt.ToAbsl(); + stats_.rtt_variation = absl::ZeroDuration(); + stats_.application_bytes_acknowledged = 0; + stats_.estimated_send_rate_bps = kDefaultBitrate.ToBitsPerSecond(); + + // Drive the connection in the steady state until the probe is activated. + EXPECT_CALL(adjustable_, OnBitrateAdjusted).Times(0); + std::optional<MoqtProbeManagerInterface::Callback> probe_callback; + quic::QuicByteCount requested_probe_size; + EXPECT_CALL(*probe_manager_, StartProbe) + .WillOnce([&](quic::QuicByteCount probe_size, quic::QuicTimeDelta timeout, + MoqtProbeManagerInterface::Callback callback) { + requested_probe_size = probe_size; + probe_callback = std::move(callback); + return 12345; + }); + for (int i = 0; i < 2500; ++i) { + clock_.AdvanceTime(kDefaultRtt); + adjuster_.OnObjectAckReceived(Location(i, 0), kDefaultTimeScale * 0.9); + if (probe_callback.has_value()) { + break; + } + } + ASSERT_TRUE(probe_callback.has_value()); + + EXPECT_CALL(*probe_manager_, HasActiveProbe).WillRepeatedly(Return(true)); + EXPECT_CALL(*probe_manager_, StopProbe).Times(0); + clock_.AdvanceTime(0.1 * kDefaultRtt); + adjuster_.OnObjectAckReceived(Location(1000, 0), kDefaultTimeScale * 0.9); + clock_.AdvanceTime(0.1 * kDefaultRtt); + adjuster_.OnObjectAckReceived(Location(1001, 0), kDefaultTimeScale * 0.5); + EXPECT_CALL(*probe_manager_, StopProbe).Times(1); + EXPECT_CALL(adjustable_, OnBitrateAdjusted(kDefaultBitrate * 0.9)); + adjuster_.OnObjectAckReceived(Location(1002, 0), kDefaultTimeScale * 0.1); +} + TEST_F(MoqtBitrateAdjusterTest, AdjustDownOnce) { stats_.estimated_send_rate_bps = (0.5 * kDefaultBitrate).ToBitsPerSecond(); @@ -144,7 +342,7 @@ } TEST_F(MoqtBitrateAdjusterTest, Reordering) { - adjuster_.parameters().max_out_of_order_objects = 1; + adjuster_.parameters().quality_level_reordering_thresholds[0] = 1; clock_.AdvanceTime(100 * kDefaultRtt); stats_.estimated_send_rate_bps = (0.5 * kDefaultBitrate).ToBitsPerSecond();
diff --git a/quiche/quic/moqt/moqt_probe_manager.cc b/quiche/quic/moqt/moqt_probe_manager.cc index 3ff5bcc..fefab71 100644 --- a/quiche/quic/moqt/moqt_probe_manager.cc +++ b/quiche/quic/moqt/moqt_probe_manager.cc
@@ -122,6 +122,7 @@ void MoqtProbeManager::ClosePendingProbe(ProbeStatus status) { std::optional<PendingProbe> probe = std::move(probe_); + probe_.reset(); if (!probe.has_value()) { QUICHE_BUG(ClosePendingProbe_no_probe); return;
diff --git a/quiche/quic/moqt/moqt_probe_manager.h b/quiche/quic/moqt/moqt_probe_manager.h index f2d8639..04d38c9 100644 --- a/quiche/quic/moqt/moqt_probe_manager.h +++ b/quiche/quic/moqt/moqt_probe_manager.h
@@ -57,6 +57,8 @@ Callback callback) = 0; // Cancels the currently pending probe. virtual std::optional<ProbeId> StopProbe() = 0; + + virtual bool HasActiveProbe() const = 0; }; namespace test { @@ -81,6 +83,7 @@ quic::QuicTimeDelta timeout, Callback callback) override; std::optional<ProbeId> StopProbe() override; + bool HasActiveProbe() const override { return probe_.has_value(); } private: friend class ::moqt::test::MoqtProbeManagerPeer;
diff --git a/quiche/quic/moqt/test_tools/moqt_simulator.cc b/quiche/quic/moqt/test_tools/moqt_simulator.cc index c1448ec..80fac10 100644 --- a/quiche/quic/moqt/test_tools/moqt_simulator.cc +++ b/quiche/quic/moqt/test_tools/moqt_simulator.cc
@@ -268,7 +268,7 @@ TrackName(), parameters.keyframe_interval, parameters.fps, parameters.i_to_p_ratio, parameters.bitrate), adjuster_(simulator_.GetClock(), client_endpoint_.session()->session(), - &generator_), + simulator_.GetAlarmFactory(), &generator_), parameters_(parameters) { if (parameters.aggregation_threshold > 0) { QuicTimeDelta timeout = parameters.aggregation_timeout;
diff --git a/quiche/quic/moqt/test_tools/moqt_simulator_test.cc b/quiche/quic/moqt/test_tools/moqt_simulator_test.cc index 0d37eab..dd0ab58 100644 --- a/quiche/quic/moqt/test_tools/moqt_simulator_test.cc +++ b/quiche/quic/moqt/test_tools/moqt_simulator_test.cc
@@ -42,14 +42,24 @@ MoqtSimulator simulator(SimulationParameters{}); simulator.Run(); EXPECT_NEAR(simulator.received_on_time_fraction(), 1.0f, 0.001f); - EXPECT_EQ(CountEventType(simulator.client_trace(), - EventType::MOQT_TARGET_BITRATE_SET), - 0); EXPECT_EQ(GetCongestionControlType(*simulator.client_quic_session()), quic::CongestionControlType::kBBR); EXPECT_EQ(GetCongestionControlType(*simulator.server_quic_session()), quic::CongestionControlType::kBBR); + + int bitrate_events = 0; + for (const quic_trace::Event& event : simulator.client_trace().events()) { + if (event.event_type() != EventType::MOQT_TARGET_BITRATE_SET) { + continue; + } + ++bitrate_events; + EXPECT_GT(event.bandwidth_estimate_bps(), + SimulationParameters().bitrate.ToBitsPerSecond()); + EXPECT_LT(event.bandwidth_estimate_bps(), + SimulationParameters().bandwidth.ToBitsPerSecond()); + } + EXPECT_GT(bitrate_events, 0); } // Ensure that the bitrate adaptation down works.
diff --git a/quiche/quic/moqt/tools/moqt_simulator_bin.cc b/quiche/quic/moqt/tools/moqt_simulator_bin.cc index 8bf8874..6ef6c37 100644 --- a/quiche/quic/moqt/tools/moqt_simulator_bin.cc +++ b/quiche/quic/moqt/tools/moqt_simulator_bin.cc
@@ -33,6 +33,11 @@ "Bandwidth of the simulated link, in kilobits per second."); DEFINE_QUICHE_COMMAND_LINE_FLAG( + uint64_t, bitrate, + moqt::test::SimulationParameters().bitrate.ToKBitsPerSecond(), + "Initial bitrate of the simulated media."); + +DEFINE_QUICHE_COMMAND_LINE_FLAG( absl::Duration, deadline, moqt::test::SimulationParameters().deadline.ToAbsl(), "Frame delivery deadline (used for measurement only)."); @@ -102,6 +107,8 @@ quiche::QuicheParseCommandLineFlags("moqt_simulator", argc, argv); parameters.bandwidth = quic::QuicBandwidth::FromKBitsPerSecond( quiche::GetQuicheCommandLineFlag(FLAGS_bandwidth)); + parameters.bitrate = quic::QuicBandwidth::FromKBitsPerSecond( + quiche::GetQuicheCommandLineFlag(FLAGS_bitrate)); parameters.deadline = quic::QuicTimeDelta(quiche::GetQuicheCommandLineFlag(FLAGS_deadline)); parameters.duration =