Integrate reordering into the bitrate adjuster algorithm.

Also tweak some magic numbers, including the flow control windows.

PiperOrigin-RevId: 834798752
diff --git a/quiche/quic/moqt/moqt_bitrate_adjuster.cc b/quiche/quic/moqt/moqt_bitrate_adjuster.cc
index 58079c4..3a99e6f 100644
--- a/quiche/quic/moqt/moqt_bitrate_adjuster.cc
+++ b/quiche/quic/moqt/moqt_bitrate_adjuster.cc
@@ -4,7 +4,6 @@
 
 #include "quiche/quic/moqt/moqt_bitrate_adjuster.h"
 
-#include <cstdint>
 #include <cstdlib>
 #include <optional>
 
@@ -32,31 +31,61 @@
     return;
   }
   start_time_ = clock_->Now();
+  outstanding_objects_.emplace(
+      /*max_out_of_order_objects=*/parameters_.max_out_of_order_objects);
 }
 
 void MoqtBitrateAdjuster::OnObjectAckReceived(
-    Location /*location*/, QuicTimeDelta delta_from_deadline) {
-  if (!start_time_.IsInitialized()) {
+    Location location, QuicTimeDelta delta_from_deadline) {
+  if (!start_time_.IsInitialized() || !outstanding_objects_.has_value()) {
     return;
   }
 
-  const QuicTime earliest_action_time = start_time_ + parameters_.initial_delay;
-  if (clock_->Now() < earliest_action_time) {
+  // Update the state.
+  int reordering_delta = outstanding_objects_->OnObjectAcked(location);
+
+  // Decide whether to act based on the latest signal.
+  if (!ShouldUseAckAsActionSignal(location)) {
     return;
   }
-
-  if (delta_from_deadline < QuicTimeDelta::Zero()) {
-    // While adjusting down upon the first sign of packets getting late might
-    // seem aggressive, note that:
-    //   - By the time user occurs, this is already a user-visible issue (so, in
-    //     some sense, this isn't aggressive enough).
-    //   - The adjustment won't happen if we're already bellow `k * max_bw`, so
-    //     if the delays are due to other factors like bufferbloat, the measured
-    //     bandwidth will likely not result in a downwards adjustment.
+  if (ShouldAttemptAdjustingDown(reordering_delta, delta_from_deadline)) {
     AttemptAdjustingDown();
   }
 }
 
+bool MoqtBitrateAdjuster::ShouldUseAckAsActionSignal(Location location) {
+  // Allow for some time to pass for the connection to reach the point at which
+  // the rate adaptation signals can become useful.
+  const QuicTime earliest_action_time = start_time_ + parameters_.initial_delay;
+  const bool too_early_in_the_connection = clock_->Now() < earliest_action_time;
+
+  // Ignore out-of-order acks for the purpose of deciding whether to adjust up
+  // or down.  Generally, if an ack is out of order, the bitrate adjuster has
+  // already reacted to the later object appropriately.
+  const bool is_out_of_order_ack = location < last_acked_object_;
+  last_acked_object_ = location;
+
+  return !too_early_in_the_connection && !is_out_of_order_ack;
+}
+
+bool MoqtBitrateAdjuster::ShouldAttemptAdjustingDown(
+    int reordering_delta, quic::QuicTimeDelta delta_from_deadline) const {
+  const bool has_exceeded_max_out_of_order =
+      reordering_delta > parameters_.max_out_of_order_objects;
+  QUICHE_DLOG_IF(INFO, has_exceeded_max_out_of_order)
+      << "Adjusting connection down due to reordering, delta: "
+      << reordering_delta;
+
+  const bool time_delta_too_close =
+      delta_from_deadline < parameters_.adjust_down_threshold * time_window_;
+  QUICHE_DLOG_IF(INFO, time_delta_too_close)
+      << "Adjusting connection down due to object arriving too late, time "
+         "delta: "
+      << delta_from_deadline;
+
+  return has_exceeded_max_out_of_order || time_delta_too_close;
+}
+
 void MoqtBitrateAdjuster::AttemptAdjustingDown() {
   webtransport::SessionStats stats = session_->GetSessionStats();
   QuicBandwidth target_bandwidth =
@@ -109,4 +138,11 @@
   trace_recorder_.RecordTargetBitrateSet(bitrate);
 }
 
+void MoqtBitrateAdjuster::OnNewObjectEnqueued(Location location) {
+  if (!start_time_.IsInitialized() || !outstanding_objects_.has_value()) {
+    return;
+  }
+  outstanding_objects_->OnObjectAdded(location);
+}
+
 }  // namespace moqt
diff --git a/quiche/quic/moqt/moqt_bitrate_adjuster.h b/quiche/quic/moqt/moqt_bitrate_adjuster.h
index 050e696..917bbcf 100644
--- a/quiche/quic/moqt/moqt_bitrate_adjuster.h
+++ b/quiche/quic/moqt/moqt_bitrate_adjuster.h
@@ -5,13 +5,13 @@
 #ifndef QUICHE_QUIC_MOQT_MOQT_BITRATE_ADJUSTER_H_
 #define QUICHE_QUIC_MOQT_MOQT_BITRATE_ADJUSTER_H_
 
-#include <cstdint>
 #include <optional>
 
 #include "quiche/quic/core/quic_bandwidth.h"
 #include "quiche/quic/core/quic_clock.h"
 #include "quiche/quic/core/quic_time.h"
 #include "quiche/quic/moqt/moqt_messages.h"
+#include "quiche/quic/moqt/moqt_outstanding_objects.h"
 #include "quiche/quic/moqt/moqt_session.h"
 #include "quiche/quic/moqt/moqt_trace_recorder.h"
 #include "quiche/web_transport/web_transport.h"
@@ -55,11 +55,22 @@
   // When bitrate is adjusted down, multiply the congestion controller estimate
   // by this factor.  This should be less than 1, since congestion controller
   // estimate tends to be overly optimistic in practice.
-  float target_bitrate_multiplier_down = 0.95f;
+  float target_bitrate_multiplier_down = 0.9f;
 
   // Do not perform any updates within `initial_delay` after the connection
   // start.
   quic::QuicTimeDelta initial_delay = quic::QuicTimeDelta::FromSeconds(2);
+
+  // If the object arrives too close to the deadline, the bitrate will be
+  // adjusted down.  The threshold is expressed as a fraction of `time_window`
+  // (which typically would be equal to the size of the buffer in seconds).
+  float adjust_down_threshold = 0.1f;
+
+  // The maximum gap between the next object expected to be received, and the
+  // actually received object, expressed as a number of objects.
+  //
+  // The default is 12, which corresponds to about 400ms for 30fps video.
+  int max_out_of_order_objects = 12;
 };
 
 // MoqtBitrateAdjuster monitors the progress of delivery for a single track, and
@@ -74,14 +85,24 @@
   // MoqtPublishingMonitorInterface implementation.
   void OnObjectAckSupportKnown(
       std::optional<quic::QuicTimeDelta> time_window) override;
+  void OnNewObjectEnqueued(Location location) override;
   void OnObjectAckReceived(Location location,
                            quic::QuicTimeDelta delta_from_deadline) override;
 
   MoqtTraceRecorder& trace_recorder() { return trace_recorder_; }
+  MoqtBitrateAdjusterParameters& parameters() { return parameters_; }
 
  private:
   void Start();
 
+  // Checks if the bitrate adjuster should react to an individual ack.
+  bool ShouldUseAckAsActionSignal(Location location);
+
+  // Checks if the bitrate should be adjusted down based on the result of
+  // processing an object ACK.
+  bool ShouldAttemptAdjustingDown(
+      int reordering_delta, quic::QuicTimeDelta delta_from_deadline) const;
+
   // Attempts adjusting the bitrate down.
   void AttemptAdjustingDown();
 
@@ -93,8 +114,16 @@
   BitrateAdjustable* adjustable_;   // Not owned.
   MoqtTraceRecorder trace_recorder_;
   MoqtBitrateAdjusterParameters parameters_;
+
+  // The time at which Start() has been called.
   quic::QuicTime start_time_ = quic::QuicTime::Zero();
+
+  // The window size received from the peer.  This amount is used to establish
+  // the scale for incoming time deltas in the object ACKs.
   quic::QuicTimeDelta time_window_ = quic::QuicTimeDelta::Zero();
+
+  std::optional<MoqtOutstandingObjects> outstanding_objects_;
+  Location last_acked_object_;
 };
 
 // Given a suggestion to change bitrate `old_bitrate` to `new_bitrate` with the
diff --git a/quiche/quic/moqt/moqt_bitrate_adjuster_test.cc b/quiche/quic/moqt/moqt_bitrate_adjuster_test.cc
index 95be2b2..aed88ce 100644
--- a/quiche/quic/moqt/moqt_bitrate_adjuster_test.cc
+++ b/quiche/quic/moqt/moqt_bitrate_adjuster_test.cc
@@ -67,16 +67,23 @@
   MoqtBitrateAdjuster adjuster_;
 };
 
+TEST_F(MoqtBitrateAdjusterTest, IgnoreCallsBeforeStart) {
+  MoqtBitrateAdjuster uninitialized_adjuster(&clock_, &session_, &adjustable_);
+  uninitialized_adjuster.OnNewObjectEnqueued(Location(1, 0));
+  uninitialized_adjuster.OnObjectAckReceived(
+      Location(1, 0), QuicTimeDelta::FromMilliseconds(100));
+}
+
 TEST_F(MoqtBitrateAdjusterTest, SteadyState) {
   // The fact that estimated bitrate is 1bps should not matter, since we never
   // have a reason to adjust down.
   stats_.estimated_send_rate_bps = 1;
 
-  EXPECT_CALL(adjustable_, OnBitrateAdjusted(_)).Times(0);
+  EXPECT_CALL(adjustable_, OnBitrateAdjusted).Times(0);
   for (int i = 0; i < 250; ++i) {
     clock_.AdvanceTime(kDefaultRtt);
     for (int j = 0; j < 10; ++j) {
-      adjuster_.OnObjectAckReceived(Location(i, j), kDefaultRtt * 2);
+      adjuster_.OnObjectAckReceived(Location(i, j), kDefaultTimeScale * 0.9);
     }
   }
 }
@@ -117,6 +124,38 @@
   EXPECT_EQ(adjusted_times, 2);
 }
 
+TEST_F(MoqtBitrateAdjusterTest, OutOfOrderAckIgnored) {
+  int adjusted_times = 0;
+  EXPECT_CALL(adjustable_, OnBitrateAdjusted).WillRepeatedly([&] {
+    ++adjusted_times;
+  });
+
+  clock_.AdvanceTime(100 * kDefaultRtt);
+  stats_.estimated_send_rate_bps = (0.5 * kDefaultBitrate).ToBitsPerSecond();
+  adjuster_.OnObjectAckReceived(Location(0, 1),
+                                QuicTimeDelta::FromMilliseconds(-1));
+  EXPECT_EQ(adjusted_times, 1);
+
+  clock_.AdvanceTime(100 * kDefaultRtt);
+  stats_.estimated_send_rate_bps = (0.25 * kDefaultBitrate).ToBitsPerSecond();
+  adjuster_.OnObjectAckReceived(Location(0, 0),
+                                QuicTimeDelta::FromMilliseconds(-1));
+  EXPECT_EQ(adjusted_times, 1);
+}
+
+TEST_F(MoqtBitrateAdjusterTest, Reordering) {
+  adjuster_.parameters().max_out_of_order_objects = 1;
+  clock_.AdvanceTime(100 * kDefaultRtt);
+  stats_.estimated_send_rate_bps = (0.5 * kDefaultBitrate).ToBitsPerSecond();
+
+  adjuster_.OnNewObjectEnqueued(Location(0, 0));
+  adjuster_.OnNewObjectEnqueued(Location(0, 1));
+  adjuster_.OnNewObjectEnqueued(Location(0, 2));
+
+  EXPECT_CALL(adjustable_, OnBitrateAdjusted);
+  adjuster_.OnObjectAckReceived(Location(0, 2), kDefaultTimeScale);
+}
+
 TEST_F(MoqtBitrateAdjusterTest, ShouldIgnoreBitrateAdjustment) {
   constexpr quic::QuicBandwidth kOldBandwith =
       quic::QuicBandwidth::FromKBitsPerSecond(1024);
diff --git a/quiche/quic/moqt/moqt_integration_test.cc b/quiche/quic/moqt/moqt_integration_test.cc
index 046eab9..a887b43 100644
--- a/quiche/quic/moqt/moqt_integration_test.cc
+++ b/quiche/quic/moqt/moqt_integration_test.cc
@@ -714,10 +714,12 @@
 
   MoqtKnownTrackPublisher publisher;
   server_->session()->set_publisher(&publisher);
-  auto track_publisher = std::make_shared<MockTrackPublisher>(full_track_name);
+  auto track_publisher = std::make_shared<MoqtOutgoingQueue>(
+      full_track_name, MoqtForwardingPreference::kSubgroup,
+      test_harness_.simulator().GetClock());
   publisher.Add(track_publisher);
 
-  MockPublishingMonitorInterface monitoring;
+  testing::StrictMock<MockPublishingMonitorInterface> monitoring;
   server_->session()->SetMonitoringInterfaceForTrack(full_track_name,
                                                      &monitoring);
 
@@ -726,10 +728,6 @@
       .WillOnce([&](MoqtObjectAckFunction new_ack_function) {
         ack_function = std::move(new_ack_function);
       });
-  EXPECT_CALL(*track_publisher, AddObjectListener)
-      .WillOnce([&](MoqtObjectListener* listener) {
-        listener->OnSubscribeAccepted();
-      });
   EXPECT_CALL(subscribe_visitor_, OnReply)
       .WillOnce([&](const FullTrackName&,
                     std::variant<SubscribeOkData, MoqtRequestError>) {
@@ -739,10 +737,6 @@
 
   VersionSpecificParameters parameters;
   parameters.oack_window_size = quic::QuicTimeDelta::FromMilliseconds(100);
-  ON_CALL(*track_publisher, expiration)
-      .WillByDefault(Return(quic::QuicTimeDelta::Zero()));
-  ON_CALL(*track_publisher, delivery_order)
-      .WillByDefault(Return(MoqtDeliveryOrder::kAscending));
   client_->session()->SubscribeCurrentObject(full_track_name,
                                              &subscribe_visitor_, parameters);
   EXPECT_CALL(monitoring, OnObjectAckSupportKnown(parameters.oack_window_size));
@@ -757,6 +751,9 @@
   bool success = test_harness_.RunUntilWithDefaultTimeout([&] { return done; });
   EXPECT_TRUE(success);
 
+  EXPECT_CALL(monitoring, OnNewObjectEnqueued(Location(0, 0)));
+  track_publisher->AddObject(QuicheMemSlice::Copy("test"), true);
+
   const quic_trace::Trace& trace = *server_->trace_visitor()->trace();
   std::vector<int64_t> ack_deltas;
   for (const quic_trace::Event& event : trace.events()) {
diff --git a/quiche/quic/moqt/moqt_quic_config.cc b/quiche/quic/moqt/moqt_quic_config.cc
index 415eba4..8cefd5f 100644
--- a/quiche/quic/moqt/moqt_quic_config.cc
+++ b/quiche/quic/moqt/moqt_quic_config.cc
@@ -22,7 +22,7 @@
 // A typical I-frame in a high-bitrate FHD video tends to be in the low 100 KiB
 // range. Even for a higher-latency connection such as 100ms, that would imply
 // an instantaneous bitrate of 10 Mbps.
-constexpr QuicByteCount kDefaultInitialStreamWindow = 128 * 1024;
+constexpr QuicByteCount kDefaultInitialStreamWindow = 512 * 1024;
 
 // The flow control window autotuning does work with connection-level flow
 // control, but we still can make the startup smoother by setting a more
@@ -30,7 +30,7 @@
 // much more than a single data stream at a time, since for most of the MOQT
 // users the bandwidth usage would be dominated by a single track.
 constexpr QuicByteCount kDefaultInitialConnectionWindow =
-    2 * kDefaultInitialStreamWindow;
+    3 * kDefaultInitialStreamWindow;
 
 }  // namespace
 
diff --git a/quiche/quic/moqt/moqt_session.cc b/quiche/quic/moqt/moqt_session.cc
index 6b602c0..7f8abeb 100644
--- a/quiche/quic/moqt/moqt_session.cc
+++ b/quiche/quic/moqt/moqt_session.cc
@@ -2093,8 +2093,26 @@
   if (!InWindow(location)) {
     return;
   }
+
+  if (monitoring_interface_ != nullptr) {
+    // Notify the monitoring interface about all newly published normal objects.
+    // Objects with other statuses are not guaranteed to be acknowledged, thus
+    // passing them into the monitoring interface can lead to confusion.
+    std::optional<PublishedObject> object = track_publisher_->GetCachedObject(
+        location.group, subgroup, location.object);
+    QUICHE_DCHECK(object.has_value())
+        << "Object " << absl::StrCat(location) << " on track "
+        << track_publisher_->GetTrackName().ToString()
+        << " does not exist, despite OnNewObjectAvailable being called";
+    if (object.has_value() && object->metadata.location == location &&
+        object->metadata.status == MoqtObjectStatus::kNormal) {
+      monitoring_interface_->OnNewObjectEnqueued(location);
+    }
+  }
+
   session_->trace_recorder_.RecordNewObjectAvaliable(
       track_alias_, *track_publisher_, location, subgroup, publisher_priority);
+
   DataStreamIndex index(location.group, subgroup);
   if (reset_subgroups_.contains(index)) {
     // This subgroup has already been reset, ignore.
diff --git a/quiche/quic/moqt/moqt_session.h b/quiche/quic/moqt/moqt_session.h
index 90e8bc2..859123a 100644
--- a/quiche/quic/moqt/moqt_session.h
+++ b/quiche/quic/moqt/moqt_session.h
@@ -66,6 +66,7 @@
 
   virtual void OnObjectAckSupportKnown(
       std::optional<quic::QuicTimeDelta> time_window) = 0;
+  virtual void OnNewObjectEnqueued(Location location) = 0;
   virtual void OnObjectAckReceived(Location location,
                                    quic::QuicTimeDelta delta_from_deadline) = 0;
 };
diff --git a/quiche/quic/moqt/test_tools/moqt_mock_visitor.h b/quiche/quic/moqt/test_tools/moqt_mock_visitor.h
index 433f4a9..b34f7e4 100644
--- a/quiche/quic/moqt/test_tools/moqt_mock_visitor.h
+++ b/quiche/quic/moqt/test_tools/moqt_mock_visitor.h
@@ -211,6 +211,7 @@
  public:
   MOCK_METHOD(void, OnObjectAckSupportKnown,
               (std::optional<quic::QuicTimeDelta> time_window), (override));
+  MOCK_METHOD(void, OnNewObjectEnqueued, (Location location), (override));
   MOCK_METHOD(void, OnObjectAckReceived,
               (Location location, quic::QuicTimeDelta delta_from_deadline),
               (override));
diff --git a/quiche/quic/moqt/test_tools/moqt_simulator_test.cc b/quiche/quic/moqt/test_tools/moqt_simulator_test.cc
index f09e973..0d37eab 100644
--- a/quiche/quic/moqt/test_tools/moqt_simulator_test.cc
+++ b/quiche/quic/moqt/test_tools/moqt_simulator_test.cc
@@ -61,14 +61,10 @@
   MoqtSimulator simulator(parameters);
   simulator.Run();
   EXPECT_GE(simulator.received_on_time_fraction(), 0.8f);
-  EXPECT_LT(simulator.received_on_time_fraction(), 0.99f);
-  // TODO(vasilvv): re-enable once rate adaptation works well enough so that
-  // this is no longer flaky.
-#if 0
+  EXPECT_LT(simulator.received_on_time_fraction(), 0.999f);
   EXPECT_GT(CountEventType(simulator.client_trace(),
                            EventType::MOQT_TARGET_BITRATE_SET),
             0);
-#endif
 
   quic::QuicConnectionStats stats =
       simulator.client_quic_session()->connection()->GetStats();