Integrate reordering into the bitrate adjuster algorithm.
Also tweak some magic numbers, including the flow control windows.
PiperOrigin-RevId: 834798752
diff --git a/quiche/quic/moqt/moqt_bitrate_adjuster.cc b/quiche/quic/moqt/moqt_bitrate_adjuster.cc
index 58079c4..3a99e6f 100644
--- a/quiche/quic/moqt/moqt_bitrate_adjuster.cc
+++ b/quiche/quic/moqt/moqt_bitrate_adjuster.cc
@@ -4,7 +4,6 @@
#include "quiche/quic/moqt/moqt_bitrate_adjuster.h"
-#include <cstdint>
#include <cstdlib>
#include <optional>
@@ -32,31 +31,61 @@
return;
}
start_time_ = clock_->Now();
+ outstanding_objects_.emplace(
+ /*max_out_of_order_objects=*/parameters_.max_out_of_order_objects);
}
void MoqtBitrateAdjuster::OnObjectAckReceived(
- Location /*location*/, QuicTimeDelta delta_from_deadline) {
- if (!start_time_.IsInitialized()) {
+ Location location, QuicTimeDelta delta_from_deadline) {
+ if (!start_time_.IsInitialized() || !outstanding_objects_.has_value()) {
return;
}
- const QuicTime earliest_action_time = start_time_ + parameters_.initial_delay;
- if (clock_->Now() < earliest_action_time) {
+ // Update the state.
+ int reordering_delta = outstanding_objects_->OnObjectAcked(location);
+
+ // Decide whether to act based on the latest signal.
+ if (!ShouldUseAckAsActionSignal(location)) {
return;
}
-
- if (delta_from_deadline < QuicTimeDelta::Zero()) {
- // While adjusting down upon the first sign of packets getting late might
- // seem aggressive, note that:
- // - By the time user occurs, this is already a user-visible issue (so, in
- // some sense, this isn't aggressive enough).
- // - The adjustment won't happen if we're already bellow `k * max_bw`, so
- // if the delays are due to other factors like bufferbloat, the measured
- // bandwidth will likely not result in a downwards adjustment.
+ if (ShouldAttemptAdjustingDown(reordering_delta, delta_from_deadline)) {
AttemptAdjustingDown();
}
}
+bool MoqtBitrateAdjuster::ShouldUseAckAsActionSignal(Location location) {
+ // Allow for some time to pass for the connection to reach the point at which
+ // the rate adaptation signals can become useful.
+ const QuicTime earliest_action_time = start_time_ + parameters_.initial_delay;
+ const bool too_early_in_the_connection = clock_->Now() < earliest_action_time;
+
+ // Ignore out-of-order acks for the purpose of deciding whether to adjust up
+ // or down. Generally, if an ack is out of order, the bitrate adjuster has
+ // already reacted to the later object appropriately.
+ const bool is_out_of_order_ack = location < last_acked_object_;
+ last_acked_object_ = location;
+
+ return !too_early_in_the_connection && !is_out_of_order_ack;
+}
+
+bool MoqtBitrateAdjuster::ShouldAttemptAdjustingDown(
+ int reordering_delta, quic::QuicTimeDelta delta_from_deadline) const {
+ const bool has_exceeded_max_out_of_order =
+ reordering_delta > parameters_.max_out_of_order_objects;
+ QUICHE_DLOG_IF(INFO, has_exceeded_max_out_of_order)
+ << "Adjusting connection down due to reordering, delta: "
+ << reordering_delta;
+
+ const bool time_delta_too_close =
+ delta_from_deadline < parameters_.adjust_down_threshold * time_window_;
+ QUICHE_DLOG_IF(INFO, time_delta_too_close)
+ << "Adjusting connection down due to object arriving too late, time "
+ "delta: "
+ << delta_from_deadline;
+
+ return has_exceeded_max_out_of_order || time_delta_too_close;
+}
+
void MoqtBitrateAdjuster::AttemptAdjustingDown() {
webtransport::SessionStats stats = session_->GetSessionStats();
QuicBandwidth target_bandwidth =
@@ -109,4 +138,11 @@
trace_recorder_.RecordTargetBitrateSet(bitrate);
}
+void MoqtBitrateAdjuster::OnNewObjectEnqueued(Location location) {
+ if (!start_time_.IsInitialized() || !outstanding_objects_.has_value()) {
+ return;
+ }
+ outstanding_objects_->OnObjectAdded(location);
+}
+
} // namespace moqt
diff --git a/quiche/quic/moqt/moqt_bitrate_adjuster.h b/quiche/quic/moqt/moqt_bitrate_adjuster.h
index 050e696..917bbcf 100644
--- a/quiche/quic/moqt/moqt_bitrate_adjuster.h
+++ b/quiche/quic/moqt/moqt_bitrate_adjuster.h
@@ -5,13 +5,13 @@
#ifndef QUICHE_QUIC_MOQT_MOQT_BITRATE_ADJUSTER_H_
#define QUICHE_QUIC_MOQT_MOQT_BITRATE_ADJUSTER_H_
-#include <cstdint>
#include <optional>
#include "quiche/quic/core/quic_bandwidth.h"
#include "quiche/quic/core/quic_clock.h"
#include "quiche/quic/core/quic_time.h"
#include "quiche/quic/moqt/moqt_messages.h"
+#include "quiche/quic/moqt/moqt_outstanding_objects.h"
#include "quiche/quic/moqt/moqt_session.h"
#include "quiche/quic/moqt/moqt_trace_recorder.h"
#include "quiche/web_transport/web_transport.h"
@@ -55,11 +55,22 @@
// When bitrate is adjusted down, multiply the congestion controller estimate
// by this factor. This should be less than 1, since congestion controller
// estimate tends to be overly optimistic in practice.
- float target_bitrate_multiplier_down = 0.95f;
+ float target_bitrate_multiplier_down = 0.9f;
// Do not perform any updates within `initial_delay` after the connection
// start.
quic::QuicTimeDelta initial_delay = quic::QuicTimeDelta::FromSeconds(2);
+
+ // If the object arrives too close to the deadline, the bitrate will be
+ // adjusted down. The threshold is expressed as a fraction of `time_window`
+ // (which typically would be equal to the size of the buffer in seconds).
+ float adjust_down_threshold = 0.1f;
+
+ // The maximum gap between the next object expected to be received, and the
+ // actually received object, expressed as a number of objects.
+ //
+ // The default is 12, which corresponds to about 400ms for 30fps video.
+ int max_out_of_order_objects = 12;
};
// MoqtBitrateAdjuster monitors the progress of delivery for a single track, and
@@ -74,14 +85,24 @@
// MoqtPublishingMonitorInterface implementation.
void OnObjectAckSupportKnown(
std::optional<quic::QuicTimeDelta> time_window) override;
+ void OnNewObjectEnqueued(Location location) override;
void OnObjectAckReceived(Location location,
quic::QuicTimeDelta delta_from_deadline) override;
MoqtTraceRecorder& trace_recorder() { return trace_recorder_; }
+ MoqtBitrateAdjusterParameters& parameters() { return parameters_; }
private:
void Start();
+ // Checks if the bitrate adjuster should react to an individual ack.
+ bool ShouldUseAckAsActionSignal(Location location);
+
+ // Checks if the bitrate should be adjusted down based on the result of
+ // processing an object ACK.
+ bool ShouldAttemptAdjustingDown(
+ int reordering_delta, quic::QuicTimeDelta delta_from_deadline) const;
+
// Attempts adjusting the bitrate down.
void AttemptAdjustingDown();
@@ -93,8 +114,16 @@
BitrateAdjustable* adjustable_; // Not owned.
MoqtTraceRecorder trace_recorder_;
MoqtBitrateAdjusterParameters parameters_;
+
+ // The time at which Start() has been called.
quic::QuicTime start_time_ = quic::QuicTime::Zero();
+
+ // The window size received from the peer. This amount is used to establish
+ // the scale for incoming time deltas in the object ACKs.
quic::QuicTimeDelta time_window_ = quic::QuicTimeDelta::Zero();
+
+ std::optional<MoqtOutstandingObjects> outstanding_objects_;
+ Location last_acked_object_;
};
// Given a suggestion to change bitrate `old_bitrate` to `new_bitrate` with the
diff --git a/quiche/quic/moqt/moqt_bitrate_adjuster_test.cc b/quiche/quic/moqt/moqt_bitrate_adjuster_test.cc
index 95be2b2..aed88ce 100644
--- a/quiche/quic/moqt/moqt_bitrate_adjuster_test.cc
+++ b/quiche/quic/moqt/moqt_bitrate_adjuster_test.cc
@@ -67,16 +67,23 @@
MoqtBitrateAdjuster adjuster_;
};
+TEST_F(MoqtBitrateAdjusterTest, IgnoreCallsBeforeStart) {
+ MoqtBitrateAdjuster uninitialized_adjuster(&clock_, &session_, &adjustable_);
+ uninitialized_adjuster.OnNewObjectEnqueued(Location(1, 0));
+ uninitialized_adjuster.OnObjectAckReceived(
+ Location(1, 0), QuicTimeDelta::FromMilliseconds(100));
+}
+
TEST_F(MoqtBitrateAdjusterTest, SteadyState) {
// The fact that estimated bitrate is 1bps should not matter, since we never
// have a reason to adjust down.
stats_.estimated_send_rate_bps = 1;
- EXPECT_CALL(adjustable_, OnBitrateAdjusted(_)).Times(0);
+ EXPECT_CALL(adjustable_, OnBitrateAdjusted).Times(0);
for (int i = 0; i < 250; ++i) {
clock_.AdvanceTime(kDefaultRtt);
for (int j = 0; j < 10; ++j) {
- adjuster_.OnObjectAckReceived(Location(i, j), kDefaultRtt * 2);
+ adjuster_.OnObjectAckReceived(Location(i, j), kDefaultTimeScale * 0.9);
}
}
}
@@ -117,6 +124,38 @@
EXPECT_EQ(adjusted_times, 2);
}
+TEST_F(MoqtBitrateAdjusterTest, OutOfOrderAckIgnored) {
+ int adjusted_times = 0;
+ EXPECT_CALL(adjustable_, OnBitrateAdjusted).WillRepeatedly([&] {
+ ++adjusted_times;
+ });
+
+ clock_.AdvanceTime(100 * kDefaultRtt);
+ stats_.estimated_send_rate_bps = (0.5 * kDefaultBitrate).ToBitsPerSecond();
+ adjuster_.OnObjectAckReceived(Location(0, 1),
+ QuicTimeDelta::FromMilliseconds(-1));
+ EXPECT_EQ(adjusted_times, 1);
+
+ clock_.AdvanceTime(100 * kDefaultRtt);
+ stats_.estimated_send_rate_bps = (0.25 * kDefaultBitrate).ToBitsPerSecond();
+ adjuster_.OnObjectAckReceived(Location(0, 0),
+ QuicTimeDelta::FromMilliseconds(-1));
+ EXPECT_EQ(adjusted_times, 1);
+}
+
+TEST_F(MoqtBitrateAdjusterTest, Reordering) {
+ adjuster_.parameters().max_out_of_order_objects = 1;
+ clock_.AdvanceTime(100 * kDefaultRtt);
+ stats_.estimated_send_rate_bps = (0.5 * kDefaultBitrate).ToBitsPerSecond();
+
+ adjuster_.OnNewObjectEnqueued(Location(0, 0));
+ adjuster_.OnNewObjectEnqueued(Location(0, 1));
+ adjuster_.OnNewObjectEnqueued(Location(0, 2));
+
+ EXPECT_CALL(adjustable_, OnBitrateAdjusted);
+ adjuster_.OnObjectAckReceived(Location(0, 2), kDefaultTimeScale);
+}
+
TEST_F(MoqtBitrateAdjusterTest, ShouldIgnoreBitrateAdjustment) {
constexpr quic::QuicBandwidth kOldBandwith =
quic::QuicBandwidth::FromKBitsPerSecond(1024);
diff --git a/quiche/quic/moqt/moqt_integration_test.cc b/quiche/quic/moqt/moqt_integration_test.cc
index 046eab9..a887b43 100644
--- a/quiche/quic/moqt/moqt_integration_test.cc
+++ b/quiche/quic/moqt/moqt_integration_test.cc
@@ -714,10 +714,12 @@
MoqtKnownTrackPublisher publisher;
server_->session()->set_publisher(&publisher);
- auto track_publisher = std::make_shared<MockTrackPublisher>(full_track_name);
+ auto track_publisher = std::make_shared<MoqtOutgoingQueue>(
+ full_track_name, MoqtForwardingPreference::kSubgroup,
+ test_harness_.simulator().GetClock());
publisher.Add(track_publisher);
- MockPublishingMonitorInterface monitoring;
+ testing::StrictMock<MockPublishingMonitorInterface> monitoring;
server_->session()->SetMonitoringInterfaceForTrack(full_track_name,
&monitoring);
@@ -726,10 +728,6 @@
.WillOnce([&](MoqtObjectAckFunction new_ack_function) {
ack_function = std::move(new_ack_function);
});
- EXPECT_CALL(*track_publisher, AddObjectListener)
- .WillOnce([&](MoqtObjectListener* listener) {
- listener->OnSubscribeAccepted();
- });
EXPECT_CALL(subscribe_visitor_, OnReply)
.WillOnce([&](const FullTrackName&,
std::variant<SubscribeOkData, MoqtRequestError>) {
@@ -739,10 +737,6 @@
VersionSpecificParameters parameters;
parameters.oack_window_size = quic::QuicTimeDelta::FromMilliseconds(100);
- ON_CALL(*track_publisher, expiration)
- .WillByDefault(Return(quic::QuicTimeDelta::Zero()));
- ON_CALL(*track_publisher, delivery_order)
- .WillByDefault(Return(MoqtDeliveryOrder::kAscending));
client_->session()->SubscribeCurrentObject(full_track_name,
&subscribe_visitor_, parameters);
EXPECT_CALL(monitoring, OnObjectAckSupportKnown(parameters.oack_window_size));
@@ -757,6 +751,9 @@
bool success = test_harness_.RunUntilWithDefaultTimeout([&] { return done; });
EXPECT_TRUE(success);
+ EXPECT_CALL(monitoring, OnNewObjectEnqueued(Location(0, 0)));
+ track_publisher->AddObject(QuicheMemSlice::Copy("test"), true);
+
const quic_trace::Trace& trace = *server_->trace_visitor()->trace();
std::vector<int64_t> ack_deltas;
for (const quic_trace::Event& event : trace.events()) {
diff --git a/quiche/quic/moqt/moqt_quic_config.cc b/quiche/quic/moqt/moqt_quic_config.cc
index 415eba4..8cefd5f 100644
--- a/quiche/quic/moqt/moqt_quic_config.cc
+++ b/quiche/quic/moqt/moqt_quic_config.cc
@@ -22,7 +22,7 @@
// A typical I-frame in a high-bitrate FHD video tends to be in the low 100 KiB
// range. Even for a higher-latency connection such as 100ms, that would imply
// an instantaneous bitrate of 10 Mbps.
-constexpr QuicByteCount kDefaultInitialStreamWindow = 128 * 1024;
+constexpr QuicByteCount kDefaultInitialStreamWindow = 512 * 1024;
// The flow control window autotuning does work with connection-level flow
// control, but we still can make the startup smoother by setting a more
@@ -30,7 +30,7 @@
// much more than a single data stream at a time, since for most of the MOQT
// users the bandwidth usage would be dominated by a single track.
constexpr QuicByteCount kDefaultInitialConnectionWindow =
- 2 * kDefaultInitialStreamWindow;
+ 3 * kDefaultInitialStreamWindow;
} // namespace
diff --git a/quiche/quic/moqt/moqt_session.cc b/quiche/quic/moqt/moqt_session.cc
index 6b602c0..7f8abeb 100644
--- a/quiche/quic/moqt/moqt_session.cc
+++ b/quiche/quic/moqt/moqt_session.cc
@@ -2093,8 +2093,26 @@
if (!InWindow(location)) {
return;
}
+
+ if (monitoring_interface_ != nullptr) {
+ // Notify the monitoring interface about all newly published normal objects.
+ // Objects with other statuses are not guaranteed to be acknowledged, thus
+ // passing them into the monitoring interface can lead to confusion.
+ std::optional<PublishedObject> object = track_publisher_->GetCachedObject(
+ location.group, subgroup, location.object);
+ QUICHE_DCHECK(object.has_value())
+ << "Object " << absl::StrCat(location) << " on track "
+ << track_publisher_->GetTrackName().ToString()
+ << " does not exist, despite OnNewObjectAvailable being called";
+ if (object.has_value() && object->metadata.location == location &&
+ object->metadata.status == MoqtObjectStatus::kNormal) {
+ monitoring_interface_->OnNewObjectEnqueued(location);
+ }
+ }
+
session_->trace_recorder_.RecordNewObjectAvaliable(
track_alias_, *track_publisher_, location, subgroup, publisher_priority);
+
DataStreamIndex index(location.group, subgroup);
if (reset_subgroups_.contains(index)) {
// This subgroup has already been reset, ignore.
diff --git a/quiche/quic/moqt/moqt_session.h b/quiche/quic/moqt/moqt_session.h
index 90e8bc2..859123a 100644
--- a/quiche/quic/moqt/moqt_session.h
+++ b/quiche/quic/moqt/moqt_session.h
@@ -66,6 +66,7 @@
virtual void OnObjectAckSupportKnown(
std::optional<quic::QuicTimeDelta> time_window) = 0;
+ virtual void OnNewObjectEnqueued(Location location) = 0;
virtual void OnObjectAckReceived(Location location,
quic::QuicTimeDelta delta_from_deadline) = 0;
};
diff --git a/quiche/quic/moqt/test_tools/moqt_mock_visitor.h b/quiche/quic/moqt/test_tools/moqt_mock_visitor.h
index 433f4a9..b34f7e4 100644
--- a/quiche/quic/moqt/test_tools/moqt_mock_visitor.h
+++ b/quiche/quic/moqt/test_tools/moqt_mock_visitor.h
@@ -211,6 +211,7 @@
public:
MOCK_METHOD(void, OnObjectAckSupportKnown,
(std::optional<quic::QuicTimeDelta> time_window), (override));
+ MOCK_METHOD(void, OnNewObjectEnqueued, (Location location), (override));
MOCK_METHOD(void, OnObjectAckReceived,
(Location location, quic::QuicTimeDelta delta_from_deadline),
(override));
diff --git a/quiche/quic/moqt/test_tools/moqt_simulator_test.cc b/quiche/quic/moqt/test_tools/moqt_simulator_test.cc
index f09e973..0d37eab 100644
--- a/quiche/quic/moqt/test_tools/moqt_simulator_test.cc
+++ b/quiche/quic/moqt/test_tools/moqt_simulator_test.cc
@@ -61,14 +61,10 @@
MoqtSimulator simulator(parameters);
simulator.Run();
EXPECT_GE(simulator.received_on_time_fraction(), 0.8f);
- EXPECT_LT(simulator.received_on_time_fraction(), 0.99f);
- // TODO(vasilvv): re-enable once rate adaptation works well enough so that
- // this is no longer flaky.
-#if 0
+ EXPECT_LT(simulator.received_on_time_fraction(), 0.999f);
EXPECT_GT(CountEventType(simulator.client_trace(),
EventType::MOQT_TARGET_BITRATE_SET),
0);
-#endif
quic::QuicConnectionStats stats =
simulator.client_quic_session()->connection()->GetStats();