Implement bandwidth probing in MoQ bitrate adjuster.

This CL introduces various level of "connection quality" for MoQT connection, and adjust bitrate and bandwidth probes based on the quality level.

PiperOrigin-RevId: 855310781
diff --git a/quiche/quic/moqt/moqt_bitrate_adjuster.cc b/quiche/quic/moqt/moqt_bitrate_adjuster.cc
index 3a99e6f..37990cf 100644
--- a/quiche/quic/moqt/moqt_bitrate_adjuster.cc
+++ b/quiche/quic/moqt/moqt_bitrate_adjuster.cc
@@ -4,12 +4,21 @@
 
 #include "quiche/quic/moqt/moqt_bitrate_adjuster.h"
 
+#include <algorithm>
 #include <cstdlib>
+#include <memory>
 #include <optional>
+#include <string>
+#include <utility>
 
+#include "absl/functional/bind_front.h"
+#include "absl/time/time.h"
+#include "quiche/quic/core/quic_alarm_factory.h"
 #include "quiche/quic/core/quic_bandwidth.h"
 #include "quiche/quic/core/quic_time.h"
+#include "quiche/quic/core/quic_types.h"
 #include "quiche/quic/moqt/moqt_messages.h"
+#include "quiche/quic/moqt/moqt_probe_manager.h"
 #include "quiche/common/platform/api/quiche_bug_tracker.h"
 #include "quiche/common/platform/api/quiche_logging.h"
 #include "quiche/web_transport/web_transport.h"
@@ -19,20 +28,41 @@
 namespace {
 
 using ::quic::QuicBandwidth;
+using ::quic::QuicByteCount;
 using ::quic::QuicTime;
 using ::quic::QuicTimeDelta;
 
 }  // namespace
 
+MoqtBitrateAdjuster::MoqtBitrateAdjuster(
+    const quic::QuicClock* clock, webtransport::Session* session,
+    std::unique_ptr<MoqtProbeManagerInterface> probe_manager,
+    BitrateAdjustable* adjustable)
+    : clock_(clock),
+      session_(session),
+      adjustable_(adjustable),
+      probe_manager_(std::move(probe_manager)) {}
+
+MoqtBitrateAdjuster::MoqtBitrateAdjuster(const quic::QuicClock* clock,
+                                         webtransport::Session* session,
+                                         quic::QuicAlarmFactory* alarm_factory,
+                                         BitrateAdjustable* adjustable)
+    : MoqtBitrateAdjuster(clock, session,
+                          std::make_unique<MoqtProbeManager>(
+                              session, clock, *alarm_factory, &trace_recorder_),
+                          adjustable) {}
+
 void MoqtBitrateAdjuster::Start() {
   if (start_time_.IsInitialized()) {
     QUICHE_BUG(MoqtBitrateAdjuster_double_init)
         << "MoqtBitrateAdjuster::Start() called more than once.";
     return;
   }
+
   start_time_ = clock_->Now();
   outstanding_objects_.emplace(
-      /*max_out_of_order_objects=*/parameters_.max_out_of_order_objects);
+      /*max_out_of_order_objects=*/parameters_
+          .quality_level_reordering_thresholds[0]);
 }
 
 void MoqtBitrateAdjuster::OnObjectAckReceived(
@@ -40,6 +70,7 @@
   if (!start_time_.IsInitialized() || !outstanding_objects_.has_value()) {
     return;
   }
+  const QuicTime now = clock_->Now();
 
   // Update the state.
   int reordering_delta = outstanding_objects_->OnObjectAcked(location);
@@ -48,9 +79,25 @@
   if (!ShouldUseAckAsActionSignal(location)) {
     return;
   }
-  if (ShouldAttemptAdjustingDown(reordering_delta, delta_from_deadline)) {
+  const ConnectionQualityLevel quality_level =
+      GetQualityLevel(reordering_delta, delta_from_deadline);
+
+  if (quality_level == ConnectionQualityLevel::kStable) {
+    if (!in_stable_state_since_.has_value()) {
+      in_stable_state_since_ = now;
+    }
+  } else {
+    in_stable_state_since_.reset();
+  }
+
+  if (quality_level <= ConnectionQualityLevel::kVeryPrecarious) {
+    probe_manager_->StopProbe();
+  }
+
+  if (quality_level <= ConnectionQualityLevel::kCritical) {
     AttemptAdjustingDown();
   }
+  MaybeStartProbing(now);
 }
 
 bool MoqtBitrateAdjuster::ShouldUseAckAsActionSignal(Location location) {
@@ -68,22 +115,34 @@
   return !too_early_in_the_connection && !is_out_of_order_ack;
 }
 
-bool MoqtBitrateAdjuster::ShouldAttemptAdjustingDown(
+ConnectionQualityLevel MoqtBitrateAdjuster::GetQualityLevel(
     int reordering_delta, quic::QuicTimeDelta delta_from_deadline) const {
-  const bool has_exceeded_max_out_of_order =
-      reordering_delta > parameters_.max_out_of_order_objects;
-  QUICHE_DLOG_IF(INFO, has_exceeded_max_out_of_order)
-      << "Adjusting connection down due to reordering, delta: "
-      << reordering_delta;
+  for (int i = 0; i < kNumQualityLevels - 1; ++i) {
+    const ConnectionQualityLevel level = static_cast<ConnectionQualityLevel>(i);
 
-  const bool time_delta_too_close =
-      delta_from_deadline < parameters_.adjust_down_threshold * time_window_;
-  QUICHE_DLOG_IF(INFO, time_delta_too_close)
-      << "Adjusting connection down due to object arriving too late, time "
-         "delta: "
-      << delta_from_deadline;
+    const int max_out_of_order_objects =
+        parameters_.quality_level_reordering_thresholds[i];
+    const bool has_exceeded_max_out_of_order =
+        reordering_delta > max_out_of_order_objects;
+    if (has_exceeded_max_out_of_order) {
+      QUICHE_DLOG(INFO) << "Downgrading connection quality down to " << level
+                        << " due to reordering, delta: " << reordering_delta;
+      return level;
+    }
 
-  return has_exceeded_max_out_of_order || time_delta_too_close;
+    const float time_delta_threshold =
+        parameters_.quality_level_time_thresholds[i];
+    const bool time_delta_too_close =
+        delta_from_deadline < time_delta_threshold * time_window_;
+    if (time_delta_too_close) {
+      QUICHE_DLOG(INFO) << "Downgrading connection quality down to " << level
+                        << "due to object arriving too late, time delta: "
+                        << delta_from_deadline;
+      return level;
+    }
+  }
+
+  return ConnectionQualityLevel::kStable;
 }
 
 void MoqtBitrateAdjuster::AttemptAdjustingDown() {
@@ -145,4 +204,104 @@
   outstanding_objects_->OnObjectAdded(location);
 }
 
+void MoqtBitrateAdjuster::MaybeStartProbing(QuicTime now) {
+  if (!in_stable_state_since_.has_value() || probe_manager_->HasActiveProbe() ||
+      !adjustable_->CouldUseExtraBandwidth()) {
+    return;
+  }
+
+  const QuicTime start_probe_after =
+      *in_stable_state_since_ + parameters_.time_before_probing;
+  if (now < start_probe_after) {
+    return;
+  }
+
+  const webtransport::SessionStats stats = session_->GetSessionStats();
+  const QuicBandwidth current_bandwidth =
+      std::min(adjustable_->GetCurrentBitrate(),
+               QuicBandwidth::FromBitsPerSecond(stats.estimated_send_rate_bps));
+  const QuicBandwidth target_bandwidth =
+      parameters_.probe_increase_target * current_bandwidth;
+  // Approximation of PTO as defined in RFC 9002, Section 6.2.1.
+  const QuicTimeDelta pto(stats.smoothed_rtt + 4 * stats.rtt_variation);
+  const QuicByteCount probe_size =
+      parameters_.round_trips_for_probe * pto * target_bandwidth;
+  const QuicTimeDelta probe_timeout =
+      current_bandwidth.TransferTime(probe_size) + pto;
+  std::optional<ProbeId> probe_id = probe_manager_->StartProbe(
+      probe_size, probe_timeout,
+      absl::bind_front(&MoqtBitrateAdjuster::OnProbeResult, this, stats));
+
+  if (!probe_id.has_value()) {
+    QUICHE_DLOG(WARNING)
+        << "Failed to create a new probe. Most likely blocked by flow control.";
+    in_stable_state_since_.reset();
+  }
+}
+
+void MoqtBitrateAdjuster::OnProbeResult(
+    const webtransport::SessionStats& old_stats, const ProbeResult& result) {
+  // Clear the timer before the next probe regardless of what we do with the
+  // results of this one.
+  in_stable_state_since_.reset();
+
+  // It is possible for the probe to be cancelled due to poor connection status
+  // (among other reasons).  In that case, we shouldn't attempt to increase the
+  // bitrate.
+  if (result.status == ProbeStatus::kAborted) {
+    return;
+  }
+
+  // While the probe can also be timed out, it is still possible to get a useful
+  // bandwidth increase from a timed-out probe.  Check for a probe duration
+  // threshold instead.
+  const webtransport::SessionStats stats = session_->GetSessionStats();
+  const double probe_duration_in_rtts =
+      result.time_elapsed.ToMicroseconds() /
+      absl::ToDoubleMicroseconds(stats.smoothed_rtt);
+  if (probe_duration_in_rtts < parameters_.min_probe_duration_in_rtts) {
+    return;
+  }
+
+  // Use the bandwidth estimate from the congestion controller as the main
+  // input.
+  const QuicBandwidth congestion_control_bandwidth =
+      QuicBandwidth::FromBitsPerSecond(stats.estimated_send_rate_bps);
+
+  // Use the long-term average over the duration of entire probe as the input.
+  const QuicByteCount acked_bytes = stats.application_bytes_acknowledged -
+                                    old_stats.application_bytes_acknowledged;
+  const QuicBandwidth average_goodput_over_probe_duration =
+      QuicBandwidth::FromBytesAndTimeDelta(acked_bytes, result.time_elapsed);
+  // If the probe has lasted N RTTs, assume that it could have accidentally
+  // included an extra RTT worth of data due to ack aggregation.
+  const double average_goodput_adjustment =
+      probe_duration_in_rtts / (probe_duration_in_rtts + 1);
+
+  // Use congestion controller bandwidth with the long-term average as a limit.
+  const QuicBandwidth target_bandwidth = std::min(
+      congestion_control_bandwidth * parameters_.target_bitrate_multiplier_up,
+      average_goodput_adjustment * average_goodput_over_probe_duration);
+
+  QUICHE_DLOG(INFO) << "Adjusting the bitrate up to " << target_bandwidth;
+  SuggestNewBitrate(target_bandwidth, BitrateAdjustmentType::kUp);
+}
+
+std::string ConnectionQualityLevelToString(ConnectionQualityLevel level) {
+  switch (level) {
+    case ConnectionQualityLevel::kCritical:
+      return "Critical";
+    case ConnectionQualityLevel::kVeryPrecarious:
+      return "VeryPrecarious";
+    case ConnectionQualityLevel::kPrecarious:
+      return "Precarious";
+    case ConnectionQualityLevel::kStable:
+      return "Stable";
+
+    case ConnectionQualityLevel::kNumLevels:
+      break;
+  }
+  return "[unknown]";
+}
+
 }  // namespace moqt
diff --git a/quiche/quic/moqt/moqt_bitrate_adjuster.h b/quiche/quic/moqt/moqt_bitrate_adjuster.h
index 917bbcf..1a3aa89 100644
--- a/quiche/quic/moqt/moqt_bitrate_adjuster.h
+++ b/quiche/quic/moqt/moqt_bitrate_adjuster.h
@@ -5,13 +5,17 @@
 #ifndef QUICHE_QUIC_MOQT_MOQT_BITRATE_ADJUSTER_H_
 #define QUICHE_QUIC_MOQT_MOQT_BITRATE_ADJUSTER_H_
 
+#include <memory>
 #include <optional>
+#include <string>
 
+#include "quiche/quic/core/quic_alarm_factory.h"
 #include "quiche/quic/core/quic_bandwidth.h"
 #include "quiche/quic/core/quic_clock.h"
 #include "quiche/quic/core/quic_time.h"
 #include "quiche/quic/moqt/moqt_messages.h"
 #include "quiche/quic/moqt/moqt_outstanding_objects.h"
+#include "quiche/quic/moqt/moqt_probe_manager.h"
 #include "quiche/quic/moqt/moqt_session.h"
 #include "quiche/quic/moqt/moqt_trace_recorder.h"
 #include "quiche/web_transport/web_transport.h"
@@ -49,6 +53,35 @@
                                         BitrateAdjustmentType type) = 0;
 };
 
+// Connection quality levels.  Used by the bitrate adjuster to decide when to
+// probe up or down.
+enum class ConnectionQualityLevel {
+  // Lowest quality level.  Reaching it will result in the bitrate adjuster
+  // attempting to immediately lower the bitrate.
+  kCritical,
+  // At this level, no new bandwidth probes will be started and all existing
+  // bandwidth probes are cancelled.
+  kVeryPrecarious,
+  // At this level, the adjuster will not start any new probes, but all of the
+  // previous probes will be allowed to continue.  This level is separate from
+  // the previous one to avoid unstable behavior around the boundaries.
+  kPrecarious,
+  // Highest quality level.  The adjuster will attempt to probe bandwidth if
+  // necessary.
+  kStable,
+
+  kNumLevels,
+};
+
+std::string ConnectionQualityLevelToString(ConnectionQualityLevel level);
+template <typename Sink>
+void AbslStringify(Sink& sink, ConnectionQualityLevel level) {
+  sink.Append(ConnectionQualityLevelToString(level));
+}
+
+inline constexpr int kNumQualityLevels =
+    static_cast<int>(ConnectionQualityLevel::kNumLevels);
+
 // Parameters (mostly magic numbers) that determine behavior of
 // MoqtBitrateAdjuster.
 struct MoqtBitrateAdjusterParameters {
@@ -57,20 +90,47 @@
   // estimate tends to be overly optimistic in practice.
   float target_bitrate_multiplier_down = 0.9f;
 
+  // Same, but applies for adjusting up.  Similar considerations for selecting
+  // the value apply.
+  float target_bitrate_multiplier_up = 0.9f;
+
   // Do not perform any updates within `initial_delay` after the connection
   // start.
   quic::QuicTimeDelta initial_delay = quic::QuicTimeDelta::FromSeconds(2);
 
-  // If the object arrives too close to the deadline, the bitrate will be
-  // adjusted down.  The threshold is expressed as a fraction of `time_window`
-  // (which typically would be equal to the size of the buffer in seconds).
-  float adjust_down_threshold = 0.1f;
+  // Quality level thresholds.  If the object arrives with the
+  // time-before-deadline that is lower than the first number listed here, it
+  // will result in the connection being assigned the lowest quality level, and
+  // so on.  The thresholds are expressed as a fraction of `time_window` (which
+  // typically would be equal to the size of the buffer in seconds).
+  float quality_level_time_thresholds[kNumQualityLevels - 1] = {0.2f, 0.4f,
+                                                                0.6f};
 
   // The maximum gap between the next object expected to be received, and the
-  // actually received object, expressed as a number of objects.
+  // actually received object, expressed as a number of objects. If the
+  // reordering gap exceeds the configured threshold, the connection is marked
+  // as being in that quality level. The thresholds correspond to kCritical,
+  // kVeryPrecarious, and kPrecarious.
   //
-  // The default is 12, which corresponds to about 400ms for 30fps video.
-  int max_out_of_order_objects = 12;
+  // The default for the worst-case reordering is 12, which corresponds to about
+  // 400ms for 30fps video.
+  int quality_level_reordering_thresholds[kNumQualityLevels - 1] = {12, 6, 3};
+
+  // Amount of time the connection has to spend in good state before attempting
+  // to probe for bandwidth.
+  quic::QuicTimeDelta time_before_probing = quic::QuicTimeDelta::FromSeconds(2);
+
+  // When probing, attempt to increase the bandwidth by the specified factor.
+  // Used when determining the probe size and timeout.
+  float probe_increase_target = 1.2;
+
+  // Expected duration of a probe, expressed in the number of round-trips
+  // necessary. Selecting values below 8 might interact negatively with BBR.
+  float round_trips_for_probe = 16;
+
+  // Probe results will be ignored if the probe was cancelled before lasting for
+  // the specified duration.
+  float min_probe_duration_in_rtts = 4;
 };
 
 // MoqtBitrateAdjuster monitors the progress of delivery for a single track, and
@@ -79,8 +139,12 @@
  public:
   MoqtBitrateAdjuster(const quic::QuicClock* clock,
                       webtransport::Session* session,
-                      BitrateAdjustable* adjustable)
-      : clock_(clock), session_(session), adjustable_(adjustable) {}
+                      quic::QuicAlarmFactory* alarm_factory,
+                      BitrateAdjustable* adjustable);
+  MoqtBitrateAdjuster(const quic::QuicClock* clock,
+                      webtransport::Session* session,
+                      std::unique_ptr<MoqtProbeManagerInterface> probe_manager,
+                      BitrateAdjustable* adjustable);
 
   // MoqtPublishingMonitorInterface implementation.
   void OnObjectAckSupportKnown(
@@ -100,9 +164,15 @@
 
   // Checks if the bitrate should be adjusted down based on the result of
   // processing an object ACK.
-  bool ShouldAttemptAdjustingDown(
+  ConnectionQualityLevel GetQualityLevel(
       int reordering_delta, quic::QuicTimeDelta delta_from_deadline) const;
 
+  // Starts a bandwidth probe if all the conditions are met.
+  void MaybeStartProbing(quic::QuicTime now);
+  // Callback called whenever the bandwidth probe is finished.
+  void OnProbeResult(const webtransport::SessionStats& old_stats,
+                     const ProbeResult& result);
+
   // Attempts adjusting the bitrate down.
   void AttemptAdjustingDown();
 
@@ -122,7 +192,12 @@
   // the scale for incoming time deltas in the object ACKs.
   quic::QuicTimeDelta time_window_ = quic::QuicTimeDelta::Zero();
 
+  // The earliest point at which the connection has been continuously in the
+  // stable state.  Used to time bandwidth probes.
+  std::optional<quic::QuicTime> in_stable_state_since_;
+
   std::optional<MoqtOutstandingObjects> outstanding_objects_;
+  std::unique_ptr<MoqtProbeManagerInterface> probe_manager_;
   Location last_acked_object_;
 };
 
diff --git a/quiche/quic/moqt/moqt_bitrate_adjuster_test.cc b/quiche/quic/moqt/moqt_bitrate_adjuster_test.cc
index aed88ce..ee5a747 100644
--- a/quiche/quic/moqt/moqt_bitrate_adjuster_test.cc
+++ b/quiche/quic/moqt/moqt_bitrate_adjuster_test.cc
@@ -4,9 +4,16 @@
 
 #include "quiche/quic/moqt/moqt_bitrate_adjuster.h"
 
+#include <memory>
+#include <optional>
+#include <utility>
+
+#include "absl/time/time.h"
 #include "quiche/quic/core/quic_bandwidth.h"
 #include "quiche/quic/core/quic_time.h"
+#include "quiche/quic/core/quic_types.h"
 #include "quiche/quic/moqt/moqt_messages.h"
+#include "quiche/quic/moqt/moqt_probe_manager.h"
 #include "quiche/quic/test_tools/mock_clock.h"
 #include "quiche/common/platform/api/quiche_test.h"
 #include "quiche/web_transport/test_tools/mock_web_transport.h"
@@ -18,6 +25,7 @@
 using ::quic::QuicBandwidth;
 using ::quic::QuicTimeDelta;
 using ::testing::_;
+using ::testing::Return;
 
 // Simple adjustable object that just keeps track of whatever value has been
 // assigned to it, and has a mock method to notify of it changing.
@@ -40,8 +48,18 @@
   QuicBandwidth bitrate_;
 };
 
+class MockProbeManager : public MoqtProbeManagerInterface {
+ public:
+  MOCK_METHOD(std::optional<ProbeId>, StartProbe,
+              (quic::QuicByteCount probe_size, quic::QuicTimeDelta timeout,
+               Callback callback),
+              (override));
+  MOCK_METHOD(std::optional<ProbeId>, StopProbe, (), (override));
+  MOCK_METHOD(bool, HasActiveProbe, (), (const, override));
+};
+
 constexpr QuicBandwidth kDefaultBitrate =
-    QuicBandwidth::FromBitsPerSecond(2000);
+    QuicBandwidth::FromKBitsPerSecond(2000);
 constexpr QuicTimeDelta kDefaultRtt = QuicTimeDelta::FromMilliseconds(20);
 constexpr QuicTimeDelta kDefaultTimeScale = QuicTimeDelta::FromSeconds(1);
 
@@ -49,7 +67,7 @@
  protected:
   MoqtBitrateAdjusterTest()
       : adjustable_(kDefaultBitrate),
-        adjuster_(&clock_, &session_, &adjustable_) {
+        adjuster_(&clock_, &session_, CreateProbeManager(), &adjustable_) {
     stats_.min_rtt = stats_.smoothed_rtt = kDefaultRtt.ToAbsl();
     stats_.estimated_send_rate_bps = (1.2 * kDefaultBitrate).ToBitsPerSecond();
     ON_CALL(session_, GetSessionStats()).WillByDefault([this] {
@@ -60,15 +78,24 @@
     adjuster_.OnObjectAckSupportKnown(kDefaultTimeScale);
   }
 
+  std::unique_ptr<MoqtProbeManagerInterface> CreateProbeManager() {
+    auto manager = std::make_unique<MockProbeManager>();
+    ON_CALL(*manager, HasActiveProbe).WillByDefault(Return(false));
+    probe_manager_ = manager.get();
+    return manager;
+  }
+
   MockBitrateAdjustable adjustable_;
   webtransport::SessionStats stats_;
   quic::MockClock clock_;
   webtransport::test::MockSession session_;
+  MockProbeManager* probe_manager_ = nullptr;
   MoqtBitrateAdjuster adjuster_;
 };
 
 TEST_F(MoqtBitrateAdjusterTest, IgnoreCallsBeforeStart) {
-  MoqtBitrateAdjuster uninitialized_adjuster(&clock_, &session_, &adjustable_);
+  MoqtBitrateAdjuster uninitialized_adjuster(
+      &clock_, &session_, CreateProbeManager(), &adjustable_);
   uninitialized_adjuster.OnNewObjectEnqueued(Location(1, 0));
   uninitialized_adjuster.OnObjectAckReceived(
       Location(1, 0), QuicTimeDelta::FromMilliseconds(100));
@@ -80,6 +107,7 @@
   stats_.estimated_send_rate_bps = 1;
 
   EXPECT_CALL(adjustable_, OnBitrateAdjusted).Times(0);
+  EXPECT_CALL(*probe_manager_, StartProbe).WillRepeatedly(Return(std::nullopt));
   for (int i = 0; i < 250; ++i) {
     clock_.AdvanceTime(kDefaultRtt);
     for (int j = 0; j < 10; ++j) {
@@ -88,6 +116,176 @@
   }
 }
 
+TEST_F(MoqtBitrateAdjusterTest, ProbeUp) {
+  stats_.min_rtt = kDefaultRtt.ToAbsl();
+  stats_.smoothed_rtt = kDefaultRtt.ToAbsl();
+  stats_.rtt_variation = absl::ZeroDuration();
+  stats_.application_bytes_acknowledged = 0;
+  stats_.estimated_send_rate_bps = kDefaultBitrate.ToBitsPerSecond();
+
+  // Drive the connection in the steady state until the probe is activated.
+  EXPECT_CALL(adjustable_, OnBitrateAdjusted).Times(0);
+  std::optional<MoqtProbeManagerInterface::Callback> probe_callback;
+  quic::QuicByteCount requested_probe_size;
+  EXPECT_CALL(*probe_manager_, StartProbe)
+      .WillOnce([&](quic::QuicByteCount probe_size, quic::QuicTimeDelta timeout,
+                    MoqtProbeManagerInterface::Callback callback) {
+        requested_probe_size = probe_size;
+        probe_callback = std::move(callback);
+        return 12345;
+      });
+  for (int i = 0; i < 2500; ++i) {
+    clock_.AdvanceTime(kDefaultRtt);
+    adjuster_.OnObjectAckReceived(Location(i, 0), kDefaultTimeScale * 0.9);
+    if (probe_callback.has_value()) {
+      break;
+    }
+  }
+  ASSERT_TRUE(probe_callback.has_value());
+
+  const QuicTimeDelta kProbeDuration = 20 * kDefaultRtt;
+  clock_.AdvanceTime(kProbeDuration);
+  stats_.application_bytes_acknowledged += (1 << 30);  // Arbitrary big number.
+  stats_.estimated_send_rate_bps = 2 * kDefaultBitrate.ToBitsPerSecond();
+  ProbeResult result{.id = 12345,
+                     .status = ProbeStatus::kSuccess,
+                     .probe_size = requested_probe_size,
+                     .time_elapsed = kProbeDuration};
+  EXPECT_CALL(adjustable_, OnBitrateAdjusted(kDefaultBitrate * 1.8));
+  std::move (*probe_callback)(result);
+}
+
+TEST_F(MoqtBitrateAdjusterTest, ProbeUpNotEnteredInPrecariousState) {
+  stats_.min_rtt = kDefaultRtt.ToAbsl();
+  stats_.smoothed_rtt = kDefaultRtt.ToAbsl();
+  stats_.rtt_variation = absl::ZeroDuration();
+  stats_.application_bytes_acknowledged = 0;
+  stats_.estimated_send_rate_bps = kDefaultBitrate.ToBitsPerSecond();
+
+  EXPECT_CALL(adjustable_, OnBitrateAdjusted).Times(0);
+  EXPECT_CALL(*probe_manager_, StartProbe).Times(0);
+  for (int i = 0; i < 2500; ++i) {
+    clock_.AdvanceTime(kDefaultRtt);
+    adjuster_.OnObjectAckReceived(Location(i, 0), kDefaultTimeScale * 0.5);
+  }
+}
+
+TEST_F(MoqtBitrateAdjusterTest, ProbeUpIgnoredDueToBeingTooShort) {
+  stats_.min_rtt = kDefaultRtt.ToAbsl();
+  stats_.smoothed_rtt = kDefaultRtt.ToAbsl();
+  stats_.rtt_variation = absl::ZeroDuration();
+  stats_.application_bytes_acknowledged = 0;
+  stats_.estimated_send_rate_bps = kDefaultBitrate.ToBitsPerSecond();
+
+  // Drive the connection in the steady state until the probe is activated.
+  EXPECT_CALL(adjustable_, OnBitrateAdjusted).Times(0);
+  std::optional<MoqtProbeManagerInterface::Callback> probe_callback;
+  quic::QuicByteCount requested_probe_size;
+  EXPECT_CALL(*probe_manager_, StartProbe)
+      .WillOnce([&](quic::QuicByteCount probe_size, quic::QuicTimeDelta timeout,
+                    MoqtProbeManagerInterface::Callback callback) {
+        requested_probe_size = probe_size;
+        probe_callback = std::move(callback);
+        return 12345;
+      });
+  for (int i = 0; i < 2500; ++i) {
+    clock_.AdvanceTime(kDefaultRtt);
+    adjuster_.OnObjectAckReceived(Location(i, 0), kDefaultTimeScale * 0.9);
+    if (probe_callback.has_value()) {
+      break;
+    }
+  }
+  ASSERT_TRUE(probe_callback.has_value());
+
+  const QuicTimeDelta kProbeDuration = kDefaultRtt;
+  clock_.AdvanceTime(kProbeDuration);
+  stats_.application_bytes_acknowledged += (1 << 30);  // Arbitrary big number.
+  stats_.estimated_send_rate_bps = 2 * kDefaultBitrate.ToBitsPerSecond();
+  ProbeResult result{.id = 12345,
+                     .status = ProbeStatus::kSuccess,
+                     .probe_size = requested_probe_size,
+                     .time_elapsed = kProbeDuration};
+  std::move (*probe_callback)(result);
+}
+
+TEST_F(MoqtBitrateAdjusterTest, ProbeUpUsesAverage) {
+  stats_.min_rtt = kDefaultRtt.ToAbsl();
+  stats_.smoothed_rtt = kDefaultRtt.ToAbsl();
+  stats_.rtt_variation = absl::ZeroDuration();
+  stats_.application_bytes_acknowledged = 0;
+  stats_.estimated_send_rate_bps = kDefaultBitrate.ToBitsPerSecond();
+
+  // Drive the connection in the steady state until the probe is activated.
+  EXPECT_CALL(adjustable_, OnBitrateAdjusted).Times(0);
+  std::optional<MoqtProbeManagerInterface::Callback> probe_callback;
+  quic::QuicByteCount requested_probe_size;
+  EXPECT_CALL(*probe_manager_, StartProbe)
+      .WillOnce([&](quic::QuicByteCount probe_size, quic::QuicTimeDelta timeout,
+                    MoqtProbeManagerInterface::Callback callback) {
+        requested_probe_size = probe_size;
+        probe_callback = std::move(callback);
+        return 12345;
+      });
+  for (int i = 0; i < 2500; ++i) {
+    clock_.AdvanceTime(kDefaultRtt);
+    adjuster_.OnObjectAckReceived(Location(i, 0), kDefaultTimeScale * 0.9);
+    if (probe_callback.has_value()) {
+      break;
+    }
+  }
+  ASSERT_TRUE(probe_callback.has_value());
+
+  const QuicTimeDelta kProbeDuration = 19 * kDefaultRtt;
+  const QuicBandwidth kNewBandwidth = 2 * kDefaultBitrate;
+  clock_.AdvanceTime(kProbeDuration);
+  stats_.application_bytes_acknowledged += kNewBandwidth * kProbeDuration;
+  stats_.estimated_send_rate_bps = (100 * kDefaultBitrate).ToBitsPerSecond();
+  ProbeResult result{.id = 12345,
+                     .status = ProbeStatus::kSuccess,
+                     .probe_size = requested_probe_size,
+                     .time_elapsed = kProbeDuration};
+  EXPECT_CALL(adjustable_, OnBitrateAdjusted(kNewBandwidth * (19.0 / 20.0)));
+  std::move (*probe_callback)(result);
+}
+
+TEST_F(MoqtBitrateAdjusterTest, ProbeUpCancelInBadState) {
+  stats_.min_rtt = kDefaultRtt.ToAbsl();
+  stats_.smoothed_rtt = kDefaultRtt.ToAbsl();
+  stats_.rtt_variation = absl::ZeroDuration();
+  stats_.application_bytes_acknowledged = 0;
+  stats_.estimated_send_rate_bps = kDefaultBitrate.ToBitsPerSecond();
+
+  // Drive the connection in the steady state until the probe is activated.
+  EXPECT_CALL(adjustable_, OnBitrateAdjusted).Times(0);
+  std::optional<MoqtProbeManagerInterface::Callback> probe_callback;
+  quic::QuicByteCount requested_probe_size;
+  EXPECT_CALL(*probe_manager_, StartProbe)
+      .WillOnce([&](quic::QuicByteCount probe_size, quic::QuicTimeDelta timeout,
+                    MoqtProbeManagerInterface::Callback callback) {
+        requested_probe_size = probe_size;
+        probe_callback = std::move(callback);
+        return 12345;
+      });
+  for (int i = 0; i < 2500; ++i) {
+    clock_.AdvanceTime(kDefaultRtt);
+    adjuster_.OnObjectAckReceived(Location(i, 0), kDefaultTimeScale * 0.9);
+    if (probe_callback.has_value()) {
+      break;
+    }
+  }
+  ASSERT_TRUE(probe_callback.has_value());
+
+  EXPECT_CALL(*probe_manager_, HasActiveProbe).WillRepeatedly(Return(true));
+  EXPECT_CALL(*probe_manager_, StopProbe).Times(0);
+  clock_.AdvanceTime(0.1 * kDefaultRtt);
+  adjuster_.OnObjectAckReceived(Location(1000, 0), kDefaultTimeScale * 0.9);
+  clock_.AdvanceTime(0.1 * kDefaultRtt);
+  adjuster_.OnObjectAckReceived(Location(1001, 0), kDefaultTimeScale * 0.5);
+  EXPECT_CALL(*probe_manager_, StopProbe).Times(1);
+  EXPECT_CALL(adjustable_, OnBitrateAdjusted(kDefaultBitrate * 0.9));
+  adjuster_.OnObjectAckReceived(Location(1002, 0), kDefaultTimeScale * 0.1);
+}
+
 TEST_F(MoqtBitrateAdjusterTest, AdjustDownOnce) {
   stats_.estimated_send_rate_bps = (0.5 * kDefaultBitrate).ToBitsPerSecond();
 
@@ -144,7 +342,7 @@
 }
 
 TEST_F(MoqtBitrateAdjusterTest, Reordering) {
-  adjuster_.parameters().max_out_of_order_objects = 1;
+  adjuster_.parameters().quality_level_reordering_thresholds[0] = 1;
   clock_.AdvanceTime(100 * kDefaultRtt);
   stats_.estimated_send_rate_bps = (0.5 * kDefaultBitrate).ToBitsPerSecond();
 
diff --git a/quiche/quic/moqt/moqt_probe_manager.cc b/quiche/quic/moqt/moqt_probe_manager.cc
index 3ff5bcc..fefab71 100644
--- a/quiche/quic/moqt/moqt_probe_manager.cc
+++ b/quiche/quic/moqt/moqt_probe_manager.cc
@@ -122,6 +122,7 @@
 
 void MoqtProbeManager::ClosePendingProbe(ProbeStatus status) {
   std::optional<PendingProbe> probe = std::move(probe_);
+  probe_.reset();
   if (!probe.has_value()) {
     QUICHE_BUG(ClosePendingProbe_no_probe);
     return;
diff --git a/quiche/quic/moqt/moqt_probe_manager.h b/quiche/quic/moqt/moqt_probe_manager.h
index f2d8639..04d38c9 100644
--- a/quiche/quic/moqt/moqt_probe_manager.h
+++ b/quiche/quic/moqt/moqt_probe_manager.h
@@ -57,6 +57,8 @@
                                             Callback callback) = 0;
   // Cancels the currently pending probe.
   virtual std::optional<ProbeId> StopProbe() = 0;
+
+  virtual bool HasActiveProbe() const = 0;
 };
 
 namespace test {
@@ -81,6 +83,7 @@
                                     quic::QuicTimeDelta timeout,
                                     Callback callback) override;
   std::optional<ProbeId> StopProbe() override;
+  bool HasActiveProbe() const override { return probe_.has_value(); }
 
  private:
   friend class ::moqt::test::MoqtProbeManagerPeer;
diff --git a/quiche/quic/moqt/test_tools/moqt_simulator.cc b/quiche/quic/moqt/test_tools/moqt_simulator.cc
index c1448ec..80fac10 100644
--- a/quiche/quic/moqt/test_tools/moqt_simulator.cc
+++ b/quiche/quic/moqt/test_tools/moqt_simulator.cc
@@ -268,7 +268,7 @@
                  TrackName(), parameters.keyframe_interval, parameters.fps,
                  parameters.i_to_p_ratio, parameters.bitrate),
       adjuster_(simulator_.GetClock(), client_endpoint_.session()->session(),
-                &generator_),
+                simulator_.GetAlarmFactory(), &generator_),
       parameters_(parameters) {
   if (parameters.aggregation_threshold > 0) {
     QuicTimeDelta timeout = parameters.aggregation_timeout;
diff --git a/quiche/quic/moqt/test_tools/moqt_simulator_test.cc b/quiche/quic/moqt/test_tools/moqt_simulator_test.cc
index 0d37eab..dd0ab58 100644
--- a/quiche/quic/moqt/test_tools/moqt_simulator_test.cc
+++ b/quiche/quic/moqt/test_tools/moqt_simulator_test.cc
@@ -42,14 +42,24 @@
   MoqtSimulator simulator(SimulationParameters{});
   simulator.Run();
   EXPECT_NEAR(simulator.received_on_time_fraction(), 1.0f, 0.001f);
-  EXPECT_EQ(CountEventType(simulator.client_trace(),
-                           EventType::MOQT_TARGET_BITRATE_SET),
-            0);
 
   EXPECT_EQ(GetCongestionControlType(*simulator.client_quic_session()),
             quic::CongestionControlType::kBBR);
   EXPECT_EQ(GetCongestionControlType(*simulator.server_quic_session()),
             quic::CongestionControlType::kBBR);
+
+  int bitrate_events = 0;
+  for (const quic_trace::Event& event : simulator.client_trace().events()) {
+    if (event.event_type() != EventType::MOQT_TARGET_BITRATE_SET) {
+      continue;
+    }
+    ++bitrate_events;
+    EXPECT_GT(event.bandwidth_estimate_bps(),
+              SimulationParameters().bitrate.ToBitsPerSecond());
+    EXPECT_LT(event.bandwidth_estimate_bps(),
+              SimulationParameters().bandwidth.ToBitsPerSecond());
+  }
+  EXPECT_GT(bitrate_events, 0);
 }
 
 // Ensure that the bitrate adaptation down works.
diff --git a/quiche/quic/moqt/tools/moqt_simulator_bin.cc b/quiche/quic/moqt/tools/moqt_simulator_bin.cc
index 8bf8874..6ef6c37 100644
--- a/quiche/quic/moqt/tools/moqt_simulator_bin.cc
+++ b/quiche/quic/moqt/tools/moqt_simulator_bin.cc
@@ -33,6 +33,11 @@
     "Bandwidth of the simulated link, in kilobits per second.");
 
 DEFINE_QUICHE_COMMAND_LINE_FLAG(
+    uint64_t, bitrate,
+    moqt::test::SimulationParameters().bitrate.ToKBitsPerSecond(),
+    "Initial bitrate of the simulated media.");
+
+DEFINE_QUICHE_COMMAND_LINE_FLAG(
     absl::Duration, deadline,
     moqt::test::SimulationParameters().deadline.ToAbsl(),
     "Frame delivery deadline (used for measurement only).");
@@ -102,6 +107,8 @@
   quiche::QuicheParseCommandLineFlags("moqt_simulator", argc, argv);
   parameters.bandwidth = quic::QuicBandwidth::FromKBitsPerSecond(
       quiche::GetQuicheCommandLineFlag(FLAGS_bandwidth));
+  parameters.bitrate = quic::QuicBandwidth::FromKBitsPerSecond(
+      quiche::GetQuicheCommandLineFlag(FLAGS_bitrate));
   parameters.deadline =
       quic::QuicTimeDelta(quiche::GetQuicheCommandLineFlag(FLAGS_deadline));
   parameters.duration =