Add logic to be able to adjust MoQ bitrate downwards. Right now, this uses a very simple algorithm, but we can iterate on it over time. PiperOrigin-RevId: 673865918
diff --git a/build/source_list.bzl b/build/source_list.bzl index c1a8bd0..4f1c45f 100644 --- a/build/source_list.bzl +++ b/build/source_list.bzl
@@ -1509,6 +1509,7 @@ "quic/load_balancer/load_balancer_server_id_test.cc", ] moqt_hdrs = [ + "quic/moqt/moqt_bitrate_adjuster.h", "quic/moqt/moqt_cached_object.h", "quic/moqt/moqt_framer.h", "quic/moqt/moqt_known_track_publisher.h", @@ -1531,6 +1532,8 @@ "quic/moqt/tools/moqt_server.h", ] moqt_srcs = [ + "quic/moqt/moqt_bitrate_adjuster.cc", + "quic/moqt/moqt_bitrate_adjuster_test.cc", "quic/moqt/moqt_cached_object.cc", "quic/moqt/moqt_framer.cc", "quic/moqt/moqt_framer_test.cc",
diff --git a/build/source_list.gni b/build/source_list.gni index 5f3ec02..e5285c3 100644 --- a/build/source_list.gni +++ b/build/source_list.gni
@@ -1513,6 +1513,7 @@ "src/quiche/quic/load_balancer/load_balancer_server_id_test.cc", ] moqt_hdrs = [ + "src/quiche/quic/moqt/moqt_bitrate_adjuster.h", "src/quiche/quic/moqt/moqt_cached_object.h", "src/quiche/quic/moqt/moqt_framer.h", "src/quiche/quic/moqt/moqt_known_track_publisher.h", @@ -1535,6 +1536,8 @@ "src/quiche/quic/moqt/tools/moqt_server.h", ] moqt_srcs = [ + "src/quiche/quic/moqt/moqt_bitrate_adjuster.cc", + "src/quiche/quic/moqt/moqt_bitrate_adjuster_test.cc", "src/quiche/quic/moqt/moqt_cached_object.cc", "src/quiche/quic/moqt/moqt_framer.cc", "src/quiche/quic/moqt/moqt_framer_test.cc",
diff --git a/build/source_list.json b/build/source_list.json index 94ad682..b1daa01 100644 --- a/build/source_list.json +++ b/build/source_list.json
@@ -1512,6 +1512,7 @@ "quiche/quic/load_balancer/load_balancer_server_id_test.cc" ], "moqt_hdrs": [ + "quiche/quic/moqt/moqt_bitrate_adjuster.h", "quiche/quic/moqt/moqt_cached_object.h", "quiche/quic/moqt/moqt_framer.h", "quiche/quic/moqt/moqt_known_track_publisher.h", @@ -1534,6 +1535,8 @@ "quiche/quic/moqt/tools/moqt_server.h" ], "moqt_srcs": [ + "quiche/quic/moqt/moqt_bitrate_adjuster.cc", + "quiche/quic/moqt/moqt_bitrate_adjuster_test.cc", "quiche/quic/moqt/moqt_cached_object.cc", "quiche/quic/moqt/moqt_framer.cc", "quiche/quic/moqt/moqt_framer_test.cc",
diff --git a/quiche/quic/core/quic_bandwidth.h b/quiche/quic/core/quic_bandwidth.h index 48c9eec..db9fd22 100644 --- a/quiche/quic/core/quic_bandwidth.h +++ b/quiche/quic/core/quic_bandwidth.h
@@ -101,6 +101,11 @@ std::string ToDebuggingValue() const; + template <typename Sink> + friend void AbslStringify(Sink& sink, QuicBandwidth bandwidth) { + sink.Append(bandwidth.ToDebuggingValue()); + } + private: explicit constexpr QuicBandwidth(int64_t bits_per_second) : bits_per_second_(bits_per_second >= 0 ? bits_per_second : 0) {}
diff --git a/quiche/quic/core/quic_time.h b/quiche/quic/core/quic_time.h index a8c86cf..2aaf7aa 100644 --- a/quiche/quic/core/quic_time.h +++ b/quiche/quic/core/quic_time.h
@@ -66,7 +66,7 @@ constexpr int64_t ToMicroseconds() const { return time_offset_; } // Converts the time offset to an Abseil duration. - constexpr absl::Duration ToAbsl() { + constexpr absl::Duration ToAbsl() const { if (ABSL_PREDICT_FALSE(IsInfinite())) { return absl::InfiniteDuration(); }
diff --git a/quiche/quic/moqt/moqt_bitrate_adjuster.cc b/quiche/quic/moqt/moqt_bitrate_adjuster.cc new file mode 100644 index 0000000..0326a44 --- /dev/null +++ b/quiche/quic/moqt/moqt_bitrate_adjuster.cc
@@ -0,0 +1,90 @@ +// Copyright 2024 The Chromium Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +#include "quiche/quic/moqt/moqt_bitrate_adjuster.h" + +#include <algorithm> +#include <cstdint> + +#include "quiche/quic/core/quic_bandwidth.h" +#include "quiche/quic/core/quic_time.h" +#include "quiche/common/platform/api/quiche_logging.h" +#include "quiche/web_transport/web_transport.h" + +namespace moqt { + +namespace { + +using ::quic::QuicBandwidth; +using ::quic::QuicTime; +using ::quic::QuicTimeDelta; + +// Whenever adjusting bitrate down, it is set to `kTargetBitrateMultiplier * +// bw`, where `bw` is typically windowed max bandwidth reported by BBR. The +// current value selected is a bit arbitrary; ideally, we would adjust down to +// the application data goodput (i.e. goodput excluding all of the framing +// overhead), but that would either require us knowing how to compute the +// framing overhead correctly, or implementing our own application-level goodput +// monitoring. +constexpr float kTargetBitrateMultiplier = 0.9f; + +// Avoid re-adjusting bitrate within N RTTs after adjusting it. Here, on a +// typical 20ms connection, 40 RTTs is 800ms. Cap the limit at 3000ms. +constexpr float kMinTimeBetweenAdjustmentsInRtts = 40; +constexpr QuicTimeDelta kMaxTimeBetweenAdjustments = + QuicTimeDelta::FromSeconds(3); + +} // namespace + +void MoqtBitrateAdjuster::OnObjectAckReceived( + uint64_t /*group_id*/, uint64_t /*object_id*/, + QuicTimeDelta delta_from_deadline) { + if (delta_from_deadline < QuicTimeDelta::Zero()) { + // While adjusting down upon the first sign of packets getting late might + // seem aggressive, note that: + // - By the time user occurs, this is already a user-visible issue (so, in + // some sense, this isn't aggressive enough). + // - The adjustment won't happen if we're already bellow `k * max_bw`, so + // if the delays are due to other factors like bufferbloat, the measured + // bandwidth will likely not result in a downwards adjustment. + AttemptAdjustingDown(); + } +} + +void MoqtBitrateAdjuster::AttemptAdjustingDown() { + webtransport::SessionStats stats = session_->GetSessionStats(); + + // Wait for a while after doing an adjustment. There are non-trivial costs to + // switching, so we should rate limit adjustments. + QuicTimeDelta adjustment_delay = + QuicTimeDelta(stats.smoothed_rtt * kMinTimeBetweenAdjustmentsInRtts); + adjustment_delay = std::min(adjustment_delay, kMaxTimeBetweenAdjustments); + QuicTime now = clock_->ApproximateNow(); + if (now - last_adjustment_time_ < adjustment_delay) { + return; + } + + // Only adjust downwards. + QuicBandwidth target_bandwidth = + kTargetBitrateMultiplier * + QuicBandwidth::FromBitsPerSecond(stats.estimated_send_rate_bps); + QuicBandwidth current_bandwidth = adjustable_->GetCurrentBitrate(); + if (current_bandwidth <= target_bandwidth) { + return; + } + + QUICHE_DLOG(INFO) << "Adjusting the bitrate from " << current_bandwidth + << " to " << target_bandwidth; + bool success = adjustable_->AdjustBitrate(target_bandwidth); + if (success) { + last_adjustment_time_ = now; + } +} + +void MoqtBitrateAdjuster::OnObjectAckSupportKnown(bool supported) { + QUICHE_DLOG_IF(WARNING, !supported) + << "OBJECT_ACK not supported; bitrate adjustments will not work."; +} + +} // namespace moqt
diff --git a/quiche/quic/moqt/moqt_bitrate_adjuster.h b/quiche/quic/moqt/moqt_bitrate_adjuster.h new file mode 100644 index 0000000..0075c51 --- /dev/null +++ b/quiche/quic/moqt/moqt_bitrate_adjuster.h
@@ -0,0 +1,59 @@ +// Copyright 2024 The Chromium Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +#ifndef QUICHE_QUIC_MOQT_MOQT_BITRATE_ADJUSTER_H_ +#define QUICHE_QUIC_MOQT_MOQT_BITRATE_ADJUSTER_H_ + +#include <cstdint> + +#include "quiche/quic/core/quic_bandwidth.h" +#include "quiche/quic/core/quic_clock.h" +#include "quiche/quic/core/quic_time.h" +#include "quiche/quic/moqt/moqt_session.h" +#include "quiche/web_transport/web_transport.h" + +namespace moqt { + +// A sender that can potentially have its outgoing bitrate adjusted. +class BitrateAdjustable { + public: + virtual ~BitrateAdjustable() {} + + // Returns the currently used bitrate. + virtual quic::QuicBandwidth GetCurrentBitrate() const = 0; + // Adjusts the bitrate to a new target. Returns true if the adjustment was + // successful. + virtual bool AdjustBitrate(quic::QuicBandwidth bandwidth) = 0; +}; + +// MoqtBitrateAdjuster monitors the progress of delivery for a single track, and +// adjusts the bitrate of the track in question accordingly. +class MoqtBitrateAdjuster : public MoqtPublishingMonitorInterface { + public: + MoqtBitrateAdjuster(const quic::QuicClock* clock, + webtransport::Session* session, + BitrateAdjustable* adjustable) + : clock_(clock), + session_(session), + adjustable_(adjustable), + last_adjustment_time_(clock->ApproximateNow()) {} + + // MoqtPublishingMonitorInterface implementation. + void OnObjectAckSupportKnown(bool supported) override; + void OnObjectAckReceived(uint64_t group_id, uint64_t object_id, + quic::QuicTimeDelta delta_from_deadline) override; + + private: + // Attempts adjusting the bitrate down. + void AttemptAdjustingDown(); + + const quic::QuicClock* clock_; // Not owned. + webtransport::Session* session_; // Not owned. + BitrateAdjustable* adjustable_; // Not owned. + quic::QuicTime last_adjustment_time_; +}; + +} // namespace moqt + +#endif // QUICHE_QUIC_MOQT_MOQT_BITRATE_ADJUSTER_H_
diff --git a/quiche/quic/moqt/moqt_bitrate_adjuster_test.cc b/quiche/quic/moqt/moqt_bitrate_adjuster_test.cc new file mode 100644 index 0000000..c3ac103 --- /dev/null +++ b/quiche/quic/moqt/moqt_bitrate_adjuster_test.cc
@@ -0,0 +1,137 @@ +// Copyright 2024 The Chromium Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +#include "quiche/quic/moqt/moqt_bitrate_adjuster.h" + +#include "quiche/quic/core/quic_bandwidth.h" +#include "quiche/quic/core/quic_time.h" +#include "quiche/quic/test_tools/mock_clock.h" +#include "quiche/common/platform/api/quiche_test.h" +#include "quiche/web_transport/test_tools/mock_web_transport.h" +#include "quiche/web_transport/web_transport.h" + +namespace moqt::test { +namespace { + +using ::quic::QuicBandwidth; +using ::quic::QuicTimeDelta; +using ::testing::_; + +// Simple adjustable object that just keeps track of whatever value has been +// assigned to it, and has a mock method to notify of it changing. +class MockBitrateAdjustable : public BitrateAdjustable { + public: + explicit MockBitrateAdjustable(QuicBandwidth initial_bitrate) + : bitrate_(initial_bitrate) {} + + QuicBandwidth GetCurrentBitrate() const override { return bitrate_; } + bool AdjustBitrate(QuicBandwidth bandwidth) override { + bitrate_ = bandwidth; + OnBitrateAdjusted(bandwidth); + return true; + } + + MOCK_METHOD(void, OnBitrateAdjusted, (QuicBandwidth new_bitrate), ()); + + private: + QuicBandwidth bitrate_; +}; + +constexpr QuicBandwidth kDefaultBitrate = + QuicBandwidth::FromBitsPerSecond(2000); +constexpr QuicTimeDelta kDefaultRtt = QuicTimeDelta::FromMilliseconds(20); + +class MoqtBitrateAdjusterTest : public quiche::test::QuicheTest { + protected: + MoqtBitrateAdjusterTest() + : adjustable_(kDefaultBitrate), + adjuster_(&clock_, &session_, &adjustable_) { + stats_.min_rtt = stats_.smoothed_rtt = kDefaultRtt.ToAbsl(); + stats_.estimated_send_rate_bps = (1.2 * kDefaultBitrate).ToBitsPerSecond(); + ON_CALL(session_, GetSessionStats()).WillByDefault([this] { + return stats_; + }); + } + + MockBitrateAdjustable adjustable_; + webtransport::SessionStats stats_; + quic::MockClock clock_; + webtransport::test::MockSession session_; + MoqtBitrateAdjuster adjuster_; +}; + +TEST_F(MoqtBitrateAdjusterTest, SteadyState) { + // The fact that estimated bitrate is 1bps should not matter, since we never + // have a reason to adjust down. + stats_.estimated_send_rate_bps = 1; + + EXPECT_CALL(adjustable_, OnBitrateAdjusted(_)).Times(0); + for (int i = 0; i < 250; ++i) { + clock_.AdvanceTime(kDefaultRtt); + for (int j = 0; j < 10; ++j) { + adjuster_.OnObjectAckReceived(i, j, kDefaultRtt * 2); + } + } +} + +TEST_F(MoqtBitrateAdjusterTest, AdjustDownOnce) { + stats_.estimated_send_rate_bps = (0.5 * kDefaultBitrate).ToBitsPerSecond(); + + // First time will be skipped, since we aren't far enough into connection. + EXPECT_CALL(adjustable_, OnBitrateAdjusted(_)).Times(0); + adjuster_.OnObjectAckReceived(0, 0, QuicTimeDelta::FromMilliseconds(-1)); + + clock_.AdvanceTime(100 * kDefaultRtt); + EXPECT_CALL(adjustable_, OnBitrateAdjusted(_)) + .WillOnce([](QuicBandwidth new_bitrate) { + EXPECT_LT(new_bitrate, kDefaultBitrate); + }); + adjuster_.OnObjectAckReceived(0, 1, QuicTimeDelta::FromMilliseconds(-1)); +} + +TEST_F(MoqtBitrateAdjusterTest, AdjustDownTwice) { + int adjusted_times = 0; + EXPECT_CALL(adjustable_, OnBitrateAdjusted(_)).WillRepeatedly([&] { + ++adjusted_times; + }); + + clock_.AdvanceTime(100 * kDefaultRtt); + stats_.estimated_send_rate_bps = (0.5 * kDefaultBitrate).ToBitsPerSecond(); + adjuster_.OnObjectAckReceived(0, 0, QuicTimeDelta::FromMilliseconds(-1)); + EXPECT_EQ(adjusted_times, 1); + + clock_.AdvanceTime(100 * kDefaultRtt); + stats_.estimated_send_rate_bps = (0.25 * kDefaultBitrate).ToBitsPerSecond(); + adjuster_.OnObjectAckReceived(0, 1, QuicTimeDelta::FromMilliseconds(-1)); + EXPECT_EQ(adjusted_times, 2); +} + +TEST_F(MoqtBitrateAdjusterTest, AdjustDownSecondTimeIgnoredDueToTimeLimit) { + int adjusted_times = 0; + EXPECT_CALL(adjustable_, OnBitrateAdjusted(_)).WillRepeatedly([&] { + ++adjusted_times; + }); + + clock_.AdvanceTime(100 * kDefaultRtt); + stats_.estimated_send_rate_bps = (0.5 * kDefaultBitrate).ToBitsPerSecond(); + adjuster_.OnObjectAckReceived(0, 0, QuicTimeDelta::FromMilliseconds(-1)); + EXPECT_EQ(adjusted_times, 1); + + // Two round trips is not enough delay to trigger another adjustment. + clock_.AdvanceTime(2 * kDefaultRtt); + stats_.estimated_send_rate_bps = (0.25 * kDefaultBitrate).ToBitsPerSecond(); + adjuster_.OnObjectAckReceived(0, 1, QuicTimeDelta::FromMilliseconds(-1)); + EXPECT_EQ(adjusted_times, 1); +} + +TEST_F(MoqtBitrateAdjusterTest, AdjustDownIgnoredDueToHighBandwidthMeasured) { + EXPECT_CALL(adjustable_, OnBitrateAdjusted(_)).Times(0); + + clock_.AdvanceTime(100 * kDefaultRtt); + stats_.estimated_send_rate_bps = (2.0 * kDefaultBitrate).ToBitsPerSecond(); + adjuster_.OnObjectAckReceived(0, 0, QuicTimeDelta::FromMilliseconds(-1)); +} + +} // namespace +} // namespace moqt::test
diff --git a/quiche/quic/moqt/moqt_session.h b/quiche/quic/moqt/moqt_session.h index e5a90f5..c8d29c1 100644 --- a/quiche/quic/moqt/moqt_session.h +++ b/quiche/quic/moqt/moqt_session.h
@@ -141,6 +141,7 @@ RemoteTrack::Visitor* visitor, MoqtSubscribeParameters parameters = MoqtSubscribeParameters()); + webtransport::Session* session() { return session_; } MoqtSessionCallbacks& callbacks() { return callbacks_; } MoqtPublisher* publisher() { return publisher_; } void set_publisher(MoqtPublisher* publisher) { publisher_ = publisher; }
diff --git a/quiche/quic/moqt/tools/moqt_simulator_bin.cc b/quiche/quic/moqt/tools/moqt_simulator_bin.cc index b3369cb..9916eca 100644 --- a/quiche/quic/moqt/tools/moqt_simulator_bin.cc +++ b/quiche/quic/moqt/tools/moqt_simulator_bin.cc
@@ -5,6 +5,7 @@ // moqt_simulator simulates the behavior of MoQ Transport under various network // conditions and application settings. +#include <cmath> #include <cstddef> #include <cstdint> #include <cstring> @@ -13,12 +14,14 @@ #include <optional> #include <string> #include <utility> +#include <vector> #include "absl/algorithm/container.h" #include "absl/container/flat_hash_map.h" #include "absl/strings/ascii.h" #include "absl/strings/str_cat.h" #include "absl/strings/str_format.h" +#include "absl/strings/str_join.h" #include "absl/strings/string_view.h" #include "absl/time/time.h" #include "quiche/quic/core/crypto/quic_random.h" @@ -26,6 +29,7 @@ #include "quiche/quic/core/quic_clock.h" #include "quiche/quic/core/quic_time.h" #include "quiche/quic/core/quic_types.h" +#include "quiche/quic/moqt/moqt_bitrate_adjuster.h" #include "quiche/quic/moqt/moqt_known_track_publisher.h" #include "quiche/quic/moqt/moqt_messages.h" #include "quiche/quic/moqt/moqt_outgoing_queue.h" @@ -105,7 +109,8 @@ // object generated is a timestamp, the rest is all zeroes. The first object in // the group can be made bigger than the rest, to simulate the profile of real // video bitstreams. -class ObjectGenerator : public quic::simulator::Actor { +class ObjectGenerator : public quic::simulator::Actor, + public moqt::BitrateAdjustable { public: ObjectGenerator(Simulator* simulator, const std::string& actor_name, MoqtSession* session, FullTrackName track_name, @@ -115,25 +120,15 @@ queue_(std::make_shared<MoqtOutgoingQueue>( track_name, MoqtForwardingPreference::kGroup)), keyframe_interval_(keyframe_interval), - time_between_frames_(QuicTimeDelta::FromMicroseconds(1.0e6 / fps)) { - int p_frame_count = keyframe_interval - 1; - // Compute the frame sizes as a fraction of the total group size. - float i_frame_fraction = i_to_p_ratio / (i_to_p_ratio + p_frame_count); - float p_frame_fraction = 1.0 / (i_to_p_ratio + p_frame_count); - - QuicTimeDelta group_duration = - QuicTimeDelta::FromMicroseconds(1.0e6 * keyframe_interval / fps); - QuicByteCount group_byte_count = group_duration * bitrate; - i_frame_size_ = i_frame_fraction * group_byte_count; - p_frame_size_ = p_frame_fraction * group_byte_count; - QUICHE_CHECK_GE(i_frame_size_, 8u) << "Not enough space for a timestamp"; - QUICHE_CHECK_GE(p_frame_size_, 8u) << "Not enough space for a timestamp"; - } + time_between_frames_(QuicTimeDelta::FromMicroseconds(1.0e6 / fps)), + i_to_p_ratio_(i_to_p_ratio), + bitrate_(bitrate), + bitrate_history_({bitrate}) {} void Act() override { ++frame_number_; bool i_frame = (frame_number_ % keyframe_interval_) == 0; - size_t size = i_frame ? i_frame_size_ : p_frame_size_; + size_t size = GetFrameSize(i_frame); QuicheBuffer buffer(quiche::SimpleBufferAllocator::Get(), size); memset(buffer.data(), 0, buffer.size()); @@ -151,13 +146,44 @@ std::shared_ptr<MoqtOutgoingQueue> queue() { return queue_; } size_t total_objects_sent() const { return frame_number_ + 1; } + size_t GetFrameSize(bool i_frame) const { + int p_frame_count = keyframe_interval_ - 1; + // Compute the frame sizes as a fraction of the total group size. + float i_frame_fraction = i_to_p_ratio_ / (i_to_p_ratio_ + p_frame_count); + float p_frame_fraction = 1.0 / (i_to_p_ratio_ + p_frame_count); + float frame_fraction = i_frame ? i_frame_fraction : p_frame_fraction; + + QuicTimeDelta group_duration = time_between_frames_ * keyframe_interval_; + QuicByteCount group_byte_count = group_duration * bitrate_; + size_t frame_size = std::ceil(frame_fraction * group_byte_count); + QUICHE_CHECK_GE(frame_size, 8u) + << "Frame size is too small for a timestamp"; + return frame_size; + } + + quic::QuicBandwidth GetCurrentBitrate() const override { return bitrate_; } + bool AdjustBitrate(quic::QuicBandwidth bandwidth) override { + bitrate_ = bandwidth; + bitrate_history_.push_back(bandwidth); + return true; + } + std::string FormatBitrateHistory() const { + std::vector<std::string> bits; + bits.reserve(bitrate_history_.size()); + for (QuicBandwidth bandwidth : bitrate_history_) { + bits.push_back(absl::StrCat(bandwidth)); + } + return absl::StrJoin(bits, " -> "); + } + private: std::shared_ptr<MoqtOutgoingQueue> queue_; int keyframe_interval_; QuicTimeDelta time_between_frames_; - QuicByteCount i_frame_size_; - QuicByteCount p_frame_size_; + float i_to_p_ratio_; + QuicBandwidth bitrate_; int frame_number_ = -1; + std::vector<QuicBandwidth> bitrate_history_; }; class ObjectReceiver : public RemoteTrack::Visitor { @@ -171,7 +197,9 @@ QUICHE_CHECK(!error_reason_phrase.has_value()) << *error_reason_phrase; } - void OnCanAckObjects(MoqtObjectAckFunction) override {} + void OnCanAckObjects(MoqtObjectAckFunction ack_function) override { + object_ack_function_ = std::move(ack_function); + } void OnObjectFragment(const FullTrackName& full_track_name, uint64_t group_sequence, uint64_t object_sequence, @@ -222,6 +250,10 @@ ++full_objects_received_late_; } else { ++full_objects_received_on_time_; + total_bytes_received_on_time_ += payload.size(); + } + if (object_ack_function_) { + object_ack_function_(sequence.group, sequence.object, deadline_ - delay); } } @@ -232,17 +264,22 @@ size_t full_objects_received_late() const { return full_objects_received_late_; } + size_t total_bytes_received_on_time() const { + return total_bytes_received_on_time_; + } private: const QuicClock* clock_ = nullptr; // TODO: figure out when partial objects should be discarded. absl::flat_hash_map<FullSequence, std::string> partial_objects_; + MoqtObjectAckFunction object_ack_function_ = nullptr; size_t full_objects_received_ = 0; QuicTimeDelta deadline_; size_t full_objects_received_on_time_ = 0; size_t full_objects_received_late_ = 0; + size_t total_bytes_received_on_time_ = 0; }; // Computes the size of the network queue on the switch. @@ -272,6 +309,8 @@ TrackName(), parameters.keyframe_interval, parameters.fps, parameters.i_to_p_ratio, parameters.bitrate), receiver_(simulator_.GetClock(), parameters.deadline), + adjuster_(simulator_.GetClock(), client_endpoint_.session()->session(), + &generator_), parameters_(parameters) {} MoqtSession* client_session() { return client_endpoint_.session(); } @@ -292,9 +331,11 @@ constexpr QuicTimeDelta kConnectionTimeout = QuicTimeDelta::FromSeconds(1); // Perform the QUIC and the MoQT handshake. + client_session()->set_support_object_acks(true); client_session()->callbacks().session_established_callback = [this] { client_established_ = true; }; + server_session()->set_support_object_acks(true); server_session()->callbacks().session_established_callback = [this] { server_established_ = true; }; @@ -307,6 +348,7 @@ generator_.queue()->SetDeliveryOrder(parameters_.delivery_order); client_session()->set_publisher(&publisher_); + client_session()->SetMonitoringInterfaceForTrack(TrackName(), &adjuster_); publisher_.Add(generator_.queue()); // The simulation is started as follows. At t=0: @@ -324,6 +366,8 @@ absl::Duration wait_at_the_end = 8 * client_endpoint_.quic_session()->GetSessionStats().smoothed_rtt; simulator_.RunFor(QuicTimeDelta(wait_at_the_end)); + const QuicTimeDelta total_time = + parameters_.duration + QuicTimeDelta(wait_at_the_end); absl::PrintF("Ran simulation for %v + %.1fms\n", parameters_.duration, absl::ToDoubleMilliseconds(wait_at_the_end)); @@ -344,6 +388,11 @@ FormatPercentage(receiver_.full_objects_received_late(), total_sent)); absl::PrintF(" never: %s\n", FormatPercentage(missing_objects, total_sent)); + absl::PrintF("\n"); + absl::PrintF("Average on-time goodput: %v\n", + QuicBandwidth::FromBytesAndTimeDelta( + receiver_.total_bytes_received_on_time(), total_time)); + absl::PrintF("Bitrates: %s\n", generator_.FormatBitrateHistory()); } private: @@ -356,6 +405,7 @@ MoqtKnownTrackPublisher publisher_; ObjectGenerator generator_; ObjectReceiver receiver_; + MoqtBitrateAdjuster adjuster_; SimulationParameters parameters_; bool client_established_ = false; @@ -376,6 +426,11 @@ "Frame delivery deadline (used for measurement only)."); DEFINE_QUICHE_COMMAND_LINE_FLAG( + absl::Duration, duration, + moqt::test::SimulationParameters().duration.ToAbsl(), + "Duration of the simulation"); + +DEFINE_QUICHE_COMMAND_LINE_FLAG( std::string, delivery_order, "desc", "Delivery order used for the MoQT track simulated ('asc' or 'desc')."); @@ -386,6 +441,8 @@ quiche::GetQuicheCommandLineFlag(FLAGS_bandwidth)); parameters.deadline = quic::QuicTimeDelta(quiche::GetQuicheCommandLineFlag(FLAGS_deadline)); + parameters.duration = + quic::QuicTimeDelta(quiche::GetQuicheCommandLineFlag(FLAGS_duration)); std::string raw_delivery_order = absl::AsciiStrToLower( quiche::GetQuicheCommandLineFlag(FLAGS_delivery_order));