Move all of the MOQT simulator logic into a file that is separate from the CLI.
Also fix some lint errors and typos.
PiperOrigin-RevId: 821842399
diff --git a/build/source_list.bzl b/build/source_list.bzl
index 5180db2..16360f4 100644
--- a/build/source_list.bzl
+++ b/build/source_list.bzl
@@ -1640,11 +1640,13 @@
"quic/moqt/test_tools/moqt_mock_visitor.h",
"quic/moqt/test_tools/moqt_parser_test_visitor.h",
"quic/moqt/test_tools/moqt_session_peer.h",
+ "quic/moqt/test_tools/moqt_simulator.h",
"quic/moqt/test_tools/moqt_simulator_harness.h",
"quic/moqt/test_tools/moqt_test_message.h",
]
moqt_test_support_srcs = [
"quic/moqt/test_tools/moqt_framer_utils.cc",
+ "quic/moqt/test_tools/moqt_simulator.cc",
"quic/moqt/test_tools/moqt_simulator_harness.cc",
]
binary_http_hdrs = [
diff --git a/build/source_list.gni b/build/source_list.gni
index a8fbd1c..ea50751 100644
--- a/build/source_list.gni
+++ b/build/source_list.gni
@@ -1645,11 +1645,13 @@
"src/quiche/quic/moqt/test_tools/moqt_mock_visitor.h",
"src/quiche/quic/moqt/test_tools/moqt_parser_test_visitor.h",
"src/quiche/quic/moqt/test_tools/moqt_session_peer.h",
+ "src/quiche/quic/moqt/test_tools/moqt_simulator.h",
"src/quiche/quic/moqt/test_tools/moqt_simulator_harness.h",
"src/quiche/quic/moqt/test_tools/moqt_test_message.h",
]
moqt_test_support_srcs = [
"src/quiche/quic/moqt/test_tools/moqt_framer_utils.cc",
+ "src/quiche/quic/moqt/test_tools/moqt_simulator.cc",
"src/quiche/quic/moqt/test_tools/moqt_simulator_harness.cc",
]
binary_http_hdrs = [
diff --git a/build/source_list.json b/build/source_list.json
index 5614381..b9d4167 100644
--- a/build/source_list.json
+++ b/build/source_list.json
@@ -1644,11 +1644,13 @@
"quiche/quic/moqt/test_tools/moqt_mock_visitor.h",
"quiche/quic/moqt/test_tools/moqt_parser_test_visitor.h",
"quiche/quic/moqt/test_tools/moqt_session_peer.h",
+ "quiche/quic/moqt/test_tools/moqt_simulator.h",
"quiche/quic/moqt/test_tools/moqt_simulator_harness.h",
"quiche/quic/moqt/test_tools/moqt_test_message.h"
],
"moqt_test_support_srcs": [
"quiche/quic/moqt/test_tools/moqt_framer_utils.cc",
+ "quiche/quic/moqt/test_tools/moqt_simulator.cc",
"quiche/quic/moqt/test_tools/moqt_simulator_harness.cc"
],
"binary_http_hdrs": [
diff --git a/quiche/quic/moqt/test_tools/moqt_simulator.cc b/quiche/quic/moqt/test_tools/moqt_simulator.cc
new file mode 100644
index 0000000..cbe0037
--- /dev/null
+++ b/quiche/quic/moqt/test_tools/moqt_simulator.cc
@@ -0,0 +1,380 @@
+// Copyright 2024 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#include "quiche/quic/moqt/test_tools/moqt_simulator.h"
+
+#include <cmath>
+#include <cstddef>
+#include <cstdint>
+#include <cstring>
+#include <iostream>
+#include <memory>
+#include <optional>
+#include <string>
+#include <utility>
+#include <variant>
+#include <vector>
+
+#include "absl/algorithm/container.h"
+#include "absl/base/casts.h"
+#include "absl/strings/str_cat.h"
+#include "absl/strings/str_format.h"
+#include "absl/strings/str_join.h"
+#include "absl/strings/str_replace.h"
+#include "absl/strings/string_view.h"
+#include "absl/time/time.h"
+#include "quiche/quic/core/crypto/quic_random.h"
+#include "quiche/quic/core/quic_bandwidth.h"
+#include "quiche/quic/core/quic_clock.h"
+#include "quiche/quic/core/quic_time.h"
+#include "quiche/quic/core/quic_types.h"
+#include "quiche/quic/moqt/moqt_bitrate_adjuster.h"
+#include "quiche/quic/moqt/moqt_known_track_publisher.h"
+#include "quiche/quic/moqt/moqt_messages.h"
+#include "quiche/quic/moqt/moqt_object.h"
+#include "quiche/quic/moqt/moqt_outgoing_queue.h"
+#include "quiche/quic/moqt/moqt_session.h"
+#include "quiche/quic/moqt/moqt_session_interface.h"
+#include "quiche/quic/moqt/moqt_trace_recorder.h"
+#include "quiche/quic/moqt/test_tools/moqt_simulator_harness.h"
+#include "quiche/quic/test_tools/simulator/actor.h"
+#include "quiche/quic/test_tools/simulator/link.h"
+#include "quiche/quic/test_tools/simulator/port.h"
+#include "quiche/quic/test_tools/simulator/simulator.h"
+#include "quiche/quic/test_tools/simulator/switch.h"
+#include "quiche/common/platform/api/quiche_logging.h"
+#include "quiche/common/quiche_buffer_allocator.h"
+#include "quiche/common/quiche_data_reader.h"
+#include "quiche/common/quiche_data_writer.h"
+#include "quiche/common/quiche_mem_slice.h"
+#include "quiche/common/simple_buffer_allocator.h"
+
+namespace moqt::test {
+namespace {
+
+using ::quiche::QuicheBuffer;
+using ::quiche::QuicheMemSlice;
+
+using ::quic::QuicBandwidth;
+using ::quic::QuicByteCount;
+using ::quic::QuicTime;
+using ::quic::QuicTimeDelta;
+
+using ::quic::simulator::Endpoint;
+using ::quic::simulator::Simulator;
+
+// In the simulation, the server link is supposed to be the bottleneck, so this
+// value just has to be sufficiently larger than the server link bandwidth.
+constexpr QuicBandwidth kClientLinkBandwidth =
+ QuicBandwidth::FromBitsPerSecond(10.0e6);
+constexpr MoqtVersion kMoqtVersion = kDefaultMoqtVersion;
+
+// Track name used by the simulator.
+FullTrackName TrackName() { return FullTrackName("test", "track"); }
+
+std::string FormatPercentage(size_t n, size_t total) {
+ float percentage = 100.0f * n / total;
+ return absl::StrFormat("%d / %d (%.2f%%)", n, total, percentage);
+}
+
+using OutputField = std::pair<absl::string_view, std::string>;
+
+OutputField OutputFraction(absl::string_view key, size_t n, size_t total) {
+ float fraction = static_cast<float>(n) / total;
+ return OutputField(key, absl::StrCat(fraction));
+}
+
+float RandFloat(quic::QuicRandom& rng) {
+ uint32_t number;
+ rng.RandBytes(&number, sizeof(number));
+ return absl::bit_cast<float>((number & 0x7fffff) | 0x3f800000) - 1.0f;
+}
+
+} // namespace
+
+ModificationBox::ModificationBox(Endpoint* wrapped_endpoint,
+ const SimulationParameters& parameters)
+ : Endpoint(wrapped_endpoint->simulator(),
+ absl::StrCat(wrapped_endpoint->name(), " (modification box)")),
+ wrapped_endpoint_(*wrapped_endpoint),
+ parameters_(parameters) {}
+
+void ModificationBox::OnBeforeSimulationStart() {
+ if (!parameters_.blackhole_duration.IsZero()) {
+ float offset = 0.5f + RandFloat(*simulator()->GetRandomGenerator()) * 0.2f;
+ blackhole_start_time_ =
+ simulator()->GetClock()->Now() + offset * parameters_.duration;
+ }
+}
+
+void ModificationBox::AcceptPacket(
+ std::unique_ptr<quic::simulator::Packet> packet) {
+ quic::QuicRandom* const rng = simulator()->GetRandomGenerator();
+ const quic::QuicTime now = simulator()->GetClock()->Now();
+ bool drop = false;
+ if (parameters_.packet_loss_rate > 0) {
+ if (RandFloat(*rng) < parameters_.packet_loss_rate) {
+ drop = true;
+ }
+ }
+ if (blackhole_start_time_.has_value()) {
+ quic::QuicTime blackhole_end_time =
+ *blackhole_start_time_ + parameters_.blackhole_duration;
+ if (now >= blackhole_start_time_ && now < blackhole_end_time) {
+ drop = true;
+ }
+ }
+ if (!drop) {
+ wrapped_endpoint_.GetRxPort()->AcceptPacket(std::move(packet));
+ }
+}
+
+ObjectGenerator::ObjectGenerator(quic::simulator::Simulator* simulator,
+ const std::string& actor_name,
+ MoqtSession* session, FullTrackName track_name,
+ int keyframe_interval, int fps,
+ float i_to_p_ratio,
+ quic::QuicBandwidth bitrate)
+ : Actor(simulator, actor_name),
+ queue_(std::make_shared<MoqtOutgoingQueue>(
+ track_name, MoqtForwardingPreference::kSubgroup,
+ simulator->GetClock())),
+ keyframe_interval_(keyframe_interval),
+ time_between_frames_(QuicTimeDelta::FromMicroseconds(1.0e6 / fps)),
+ i_to_p_ratio_(i_to_p_ratio),
+ bitrate_(bitrate),
+ bitrate_history_({bitrate}) {}
+
+void ObjectGenerator::Act() {
+ ++frame_number_;
+ bool i_frame = (frame_number_ % keyframe_interval_) == 0;
+ size_t size = GetFrameSize(i_frame);
+
+ QuicheBuffer buffer(quiche::SimpleBufferAllocator::Get(), size);
+ memset(buffer.data(), 0, buffer.size());
+ quiche::QuicheDataWriter writer(size, buffer.data());
+ bool success = writer.WriteUInt64(clock_->Now().ToDebuggingValue());
+ QUICHE_CHECK(success);
+
+ queue_->AddObject(QuicheMemSlice(std::move(buffer)), i_frame);
+ Schedule(clock_->Now() + time_between_frames_);
+}
+
+size_t ObjectGenerator::GetFrameSize(bool i_frame) const {
+ int p_frame_count = keyframe_interval_ - 1;
+ // Compute the frame sizes as a fraction of the total group size.
+ float i_frame_fraction = i_to_p_ratio_ / (i_to_p_ratio_ + p_frame_count);
+ float p_frame_fraction = 1.0 / (i_to_p_ratio_ + p_frame_count);
+ float frame_fraction = i_frame ? i_frame_fraction : p_frame_fraction;
+
+ QuicTimeDelta group_duration = time_between_frames_ * keyframe_interval_;
+ QuicByteCount group_byte_count = group_duration * bitrate_;
+ size_t frame_size = std::ceil(frame_fraction * group_byte_count);
+ QUICHE_CHECK_GE(frame_size, 8u) << "Frame size is too small for a timestamp";
+ return frame_size;
+}
+
+void ObjectGenerator::ConsiderAdjustingBitrate(quic::QuicBandwidth bandwidth,
+ BitrateAdjustmentType type) {
+ if (moqt::ShouldIgnoreBitrateAdjustment(bandwidth, type, bitrate_,
+ /*min_change=*/0.01)) {
+ return;
+ }
+ bitrate_ = bandwidth;
+ bitrate_history_.push_back(bandwidth);
+}
+
+std::string ObjectGenerator::FormatBitrateHistory() const {
+ std::vector<std::string> bits;
+ bits.reserve(bitrate_history_.size());
+ for (QuicBandwidth bandwidth : bitrate_history_) {
+ bits.push_back(absl::StrCat(bandwidth));
+ }
+ return absl::StrJoin(bits, " -> ");
+}
+
+void ObjectReceiver::OnReply(
+ const FullTrackName& full_track_name,
+ std::variant<SubscribeOkData, MoqtRequestError> response) {
+ QUICHE_CHECK(full_track_name == TrackName());
+ if (std::holds_alternative<MoqtRequestError>(response)) {
+ MoqtRequestError error = std::get<MoqtRequestError>(response);
+ QUICHE_CHECK(!error.reason_phrase.empty()) << error.reason_phrase;
+ }
+}
+
+void ObjectReceiver::OnObjectFragment(const FullTrackName& full_track_name,
+ const PublishedObjectMetadata& metadata,
+ absl::string_view object,
+ bool end_of_message) {
+ QUICHE_DCHECK(full_track_name == TrackName());
+ if (metadata.status != MoqtObjectStatus::kNormal) {
+ QUICHE_DCHECK(end_of_message);
+ return;
+ }
+ if (!end_of_message) {
+ QUICHE_LOG(DFATAL) << "Partial receiving of objects wasn't enabled";
+ return;
+ }
+ OnFullObject(metadata.location, object);
+}
+
+void ObjectReceiver::OnFullObject(Location sequence,
+ absl::string_view payload) {
+ QUICHE_CHECK_GE(payload.size(), 8u);
+ quiche::QuicheDataReader reader(payload);
+ uint64_t time_us;
+ reader.ReadUInt64(&time_us);
+ QuicTime time = QuicTime::Zero() + QuicTimeDelta::FromMicroseconds(time_us);
+ QuicTimeDelta delay = clock_->Now() - time;
+ QUICHE_CHECK_GT(delay, QuicTimeDelta::Zero());
+ QUICHE_DCHECK(absl::c_all_of(reader.ReadRemainingPayload(),
+ [](char c) { return c == 0; }));
+ ++full_objects_received_;
+ if (delay > deadline_) {
+ ++full_objects_received_late_;
+ } else {
+ ++full_objects_received_on_time_;
+ total_bytes_received_on_time_ += payload.size();
+ }
+ if (object_ack_function_) {
+ object_ack_function_(sequence.group, sequence.object, deadline_ - delay);
+ }
+}
+
+// Computes the size of the network queue on the switch.
+constexpr QuicByteCount AdjustedQueueSize(
+ const SimulationParameters& parameters) {
+ if (parameters.network_queue_size > 0) {
+ return parameters.network_queue_size;
+ }
+ QuicByteCount bdp = parameters.bandwidth * parameters.min_rtt;
+ return 2 * bdp;
+}
+
+MoqtSimulator::MoqtSimulator(const SimulationParameters& parameters)
+ : simulator_(quic::QuicRandom::GetInstance()),
+ client_endpoint_(&simulator_, "Client", "Server", kMoqtVersion),
+ server_endpoint_(&simulator_, "Server", "Client", kMoqtVersion),
+ switch_(&simulator_, "Switch", 8, AdjustedQueueSize(parameters)),
+ modification_box_(switch_.port(1), parameters),
+ client_link_(&client_endpoint_, &modification_box_, kClientLinkBandwidth,
+ parameters.min_rtt * 0.25),
+ server_link_(&server_endpoint_, switch_.port(2), parameters.bandwidth,
+ parameters.min_rtt * 0.25),
+ generator_(&simulator_, "Client generator", client_endpoint_.session(),
+ TrackName(), parameters.keyframe_interval, parameters.fps,
+ parameters.i_to_p_ratio, parameters.bitrate),
+ receiver_(simulator_.GetClock(), parameters.deadline),
+ adjuster_(simulator_.GetClock(), client_endpoint_.session()->session(),
+ &generator_),
+ parameters_(parameters) {
+ if (parameters.aggregation_threshold > 0) {
+ QuicTimeDelta timeout = parameters.aggregation_timeout;
+ if (timeout.IsZero()) {
+ timeout = parameters.min_rtt * 0.25;
+ }
+ switch_.port_queue(2)->EnableAggregation(parameters.aggregation_threshold,
+ timeout);
+ }
+ client_endpoint_.RecordTrace();
+ QUICHE_DCHECK(client_endpoint_.trace_visitor() != nullptr);
+ client_endpoint_.session()->trace_recorder().set_trace(
+ client_endpoint_.trace_visitor()->trace());
+}
+
+std::string MoqtSimulator::GetClientSessionCongestionControl() {
+ return quic::CongestionControlTypeToString(client_endpoint_.quic_session()
+ ->connection()
+ ->sent_packet_manager()
+ .GetSendAlgorithm()
+ ->GetCongestionControlType());
+}
+
+void MoqtSimulator::Run() {
+ // Perform the QUIC and the MoQT handshake.
+ client_session()->set_support_object_acks(true);
+ server_session()->set_support_object_acks(true);
+ RunHandshakeOrDie(simulator_, client_endpoint_, server_endpoint_);
+
+ generator_.queue()->SetDeliveryOrder(parameters_.delivery_order);
+ client_session()->set_publisher(&publisher_);
+ if (parameters_.bitrate_adaptation) {
+ client_session()->SetMonitoringInterfaceForTrack(TrackName(), &adjuster_);
+ }
+ if (parameters_.alternative_timeout) {
+ client_session()->UseAlternateDeliveryTimeout();
+ }
+ publisher_.Add(generator_.queue());
+ modification_box_.OnBeforeSimulationStart();
+
+ // The simulation is started as follows. At t=0:
+ // (1) The server issues a subscribe request.
+ // (2) The client starts immediately generating data. At this point, the
+ // server does not yet have an active subscription, so the client has
+ // some catching up to do.
+ generator_.Start();
+ VersionSpecificParameters subscription_parameters;
+ if (parameters_.bitrate_adaptation) {
+ subscription_parameters.oack_window_size = parameters_.deadline;
+ }
+ if (!parameters_.delivery_timeout.IsInfinite()) {
+ subscription_parameters.delivery_timeout = parameters_.delivery_timeout;
+ }
+ server_session()->RelativeJoiningFetch(TrackName(), &receiver_, 0,
+ subscription_parameters);
+ simulator_.RunFor(parameters_.duration);
+
+ // At the end, we wait for eight RTTs until the connection settles down.
+ generator_.Stop();
+ wait_at_the_end_ =
+ 8 * client_endpoint_.quic_session()->GetSessionStats().smoothed_rtt;
+ simulator_.RunFor(QuicTimeDelta(wait_at_the_end_));
+}
+
+void MoqtSimulator::HumanReadableOutput() {
+ const QuicTimeDelta total_time =
+ parameters_.duration + QuicTimeDelta(wait_at_the_end_);
+ absl::PrintF("Ran simulation for %v + %.1fms\n", parameters_.duration,
+ absl::ToDoubleMilliseconds(wait_at_the_end_));
+ absl::PrintF("Congestion control used: %s\n",
+ GetClientSessionCongestionControl());
+
+ size_t total_sent = generator_.total_objects_sent();
+ size_t missing_objects =
+ generator_.total_objects_sent() - receiver_.full_objects_received();
+ absl::PrintF("Objects received: %s\n",
+ FormatPercentage(receiver_.full_objects_received(), total_sent));
+ absl::PrintF(
+ " on time: %s\n",
+ FormatPercentage(receiver_.full_objects_received_on_time(), total_sent));
+ absl::PrintF(
+ " late: %s\n",
+ FormatPercentage(receiver_.full_objects_received_late(), total_sent));
+ absl::PrintF(" never: %s\n",
+ FormatPercentage(missing_objects, total_sent));
+ absl::PrintF("\n");
+ absl::PrintF("Average on-time goodput: %v\n",
+ QuicBandwidth::FromBytesAndTimeDelta(
+ receiver_.total_bytes_received_on_time(), total_time));
+ absl::PrintF("Bitrates: %s\n", generator_.FormatBitrateHistory());
+}
+
+void MoqtSimulator::CustomOutput(absl::string_view format) {
+ size_t total_sent = generator_.total_objects_sent();
+ std::vector<OutputField> fields;
+ fields.push_back(OutputFraction("{on_time_fraction}",
+ receiver_.full_objects_received_on_time(),
+ total_sent));
+ fields.push_back(OutputFraction(
+ "{late_fraction}", receiver_.full_objects_received_late(), total_sent));
+ size_t missing_objects =
+ generator_.total_objects_sent() - receiver_.full_objects_received();
+ fields.push_back(
+ OutputFraction("{missing_fraction}", missing_objects, total_sent));
+ std::string output = absl::StrReplaceAll(format, fields);
+ std::cout << output << std::endl;
+}
+
+} // namespace moqt::test
diff --git a/quiche/quic/moqt/test_tools/moqt_simulator.h b/quiche/quic/moqt/test_tools/moqt_simulator.h
new file mode 100644
index 0000000..80b7d74
--- /dev/null
+++ b/quiche/quic/moqt/test_tools/moqt_simulator.h
@@ -0,0 +1,237 @@
+// Copyright 2024 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#ifndef QUICHE_QUIC_MOQT_TEST_TOOLS_MOQT_SIMULATOR_H_
+#define QUICHE_QUIC_MOQT_TEST_TOOLS_MOQT_SIMULATOR_H_
+
+#include <cstddef>
+#include <memory>
+#include <optional>
+#include <string>
+#include <utility>
+#include <variant>
+#include <vector>
+
+#include "absl/container/flat_hash_map.h"
+#include "absl/strings/string_view.h"
+#include "absl/time/time.h"
+#include "quiche/quic/core/quic_bandwidth.h"
+#include "quiche/quic/core/quic_time.h"
+#include "quiche/quic/core/quic_types.h"
+#include "quiche/quic/moqt/moqt_bitrate_adjuster.h"
+#include "quiche/quic/moqt/moqt_known_track_publisher.h"
+#include "quiche/quic/moqt/moqt_messages.h"
+#include "quiche/quic/moqt/moqt_object.h"
+#include "quiche/quic/moqt/moqt_outgoing_queue.h"
+#include "quiche/quic/moqt/moqt_priority.h"
+#include "quiche/quic/moqt/moqt_session.h"
+#include "quiche/quic/moqt/moqt_session_interface.h"
+#include "quiche/quic/moqt/test_tools/moqt_simulator_harness.h"
+#include "quiche/quic/test_tools/simulator/actor.h"
+#include "quiche/quic/test_tools/simulator/link.h"
+#include "quiche/quic/test_tools/simulator/port.h"
+#include "quiche/quic/test_tools/simulator/switch.h"
+
+namespace moqt::test {
+
+// Parameters describing the scenario being simulated.
+struct SimulationParameters {
+ // Bottleneck bandwidth of the simulated scenario.
+ quic::QuicBandwidth bandwidth = quic::QuicBandwidth::FromBitsPerSecond(2.0e6);
+ // Intended RTT (as computed from propagation delay alone) between the client
+ // and the server.
+ quic::QuicTimeDelta min_rtt = quic::QuicTimeDelta::FromMilliseconds(20);
+ // The size of the network queue; if zero, assumed to be twice the BDP.
+ quic::QuicByteCount network_queue_size = 0;
+ // Duration for which the simulation is run.
+ quic::QuicTimeDelta duration = quic::QuicTimeDelta::FromSeconds(60);
+ // Packet aggregation timeout. If zero, this will be set to the quarter of
+ // min RTT.
+ quic::QuicTimeDelta aggregation_timeout = quic::QuicTimeDelta::Zero();
+ // Packet aggregation threshold. If zero, packet aggregation is disabled.
+ quic::QuicByteCount aggregation_threshold = 0;
+
+ // Count frames as useful only if they were received `deadline` after which
+ // they were generated.
+ quic::QuicTimeDelta deadline = quic::QuicTimeDelta::FromSeconds(2);
+ // Delivery order used by the publisher.
+ MoqtDeliveryOrder delivery_order = MoqtDeliveryOrder::kDescending;
+ // Delivery timeout for the subscription. This is mechanically independent
+ // from `deadline`, which is an accounting-only parameter (in practice, those
+ // should probably be close).
+ quic::QuicTimeDelta delivery_timeout = quic::QuicTimeDelta::Infinite();
+ // Whether MoqtBitrateAdjuster is enabled.
+ bool bitrate_adaptation = true;
+ // Use alternative delivery timeout design.
+ bool alternative_timeout = false;
+
+ // Number of frames in an individual group.
+ int keyframe_interval = 30 * 2;
+ // Number of frames generated per second.
+ int fps = 30;
+ // The ratio by which an I-frame is bigger than a P-frame.
+ float i_to_p_ratio = 2 / 1;
+ // The target bitrate of the data being exchanged.
+ quic::QuicBandwidth bitrate = quic::QuicBandwidth::FromBitsPerSecond(1.0e6);
+
+ // Adds random packet loss rate, as a fraction.
+ float packet_loss_rate = 0.0f;
+
+ // If non-zero, makes the traffic disappear in the middle of the connection
+ // for the specified duration.
+ quic::QuicTimeDelta blackhole_duration = quic::QuicTimeDelta::Zero();
+};
+
+// Box that enacts MoQT simulator specific modifications to the traffic.
+class ModificationBox : public quic::simulator::Endpoint,
+ public quic::simulator::UnconstrainedPortInterface {
+ public:
+ ModificationBox(Endpoint* wrapped_endpoint,
+ const SimulationParameters& parameters);
+
+ void OnBeforeSimulationStart();
+
+ // Endpoint implementation.
+ void Act() override {}
+ quic::simulator::UnconstrainedPortInterface* GetRxPort() override {
+ return this;
+ }
+ void SetTxPort(quic::simulator::ConstrainedPortInterface* port) override {
+ return wrapped_endpoint_.SetTxPort(port);
+ }
+
+ // UnconstrainedPortInterface implementation.
+ void AcceptPacket(std::unique_ptr<quic::simulator::Packet> packet) override;
+
+ private:
+ Endpoint& wrapped_endpoint_;
+ SimulationParameters parameters_;
+ std::optional<quic::QuicTime> blackhole_start_time_;
+};
+
+// Generates test objects at a constant rate. The first eight bytes of every
+// object generated is a timestamp, the rest is all zeroes. The first object in
+// the group can be made bigger than the rest, to simulate the profile of real
+// video bitstreams.
+class ObjectGenerator : public quic::simulator::Actor,
+ public moqt::BitrateAdjustable {
+ public:
+ ObjectGenerator(quic::simulator::Simulator* simulator,
+ const std::string& actor_name, MoqtSession* session,
+ FullTrackName track_name, int keyframe_interval, int fps,
+ float i_to_p_ratio, quic::QuicBandwidth bitrate);
+
+ void Act() override;
+
+ void Start() { Schedule(clock_->Now()); }
+ void Stop() { Unschedule(); }
+
+ std::shared_ptr<MoqtOutgoingQueue> queue() { return queue_; }
+ size_t total_objects_sent() const { return frame_number_ + 1; }
+
+ size_t GetFrameSize(bool i_frame) const;
+
+ quic::QuicBandwidth GetCurrentBitrate() const override { return bitrate_; }
+ bool CouldUseExtraBandwidth() override { return true; }
+ void ConsiderAdjustingBitrate(quic::QuicBandwidth bandwidth,
+ BitrateAdjustmentType type) override;
+ std::string FormatBitrateHistory() const;
+
+ private:
+ std::shared_ptr<MoqtOutgoingQueue> queue_;
+ int keyframe_interval_;
+ quic::QuicTimeDelta time_between_frames_;
+ float i_to_p_ratio_;
+ quic::QuicBandwidth bitrate_;
+ int frame_number_ = -1;
+ std::vector<quic::QuicBandwidth> bitrate_history_;
+};
+
+class ObjectReceiver : public SubscribeVisitor {
+ public:
+ explicit ObjectReceiver(const quic::QuicClock* clock,
+ quic::QuicTimeDelta deadline)
+ : clock_(clock), deadline_(deadline) {}
+
+ void OnReply(
+ const FullTrackName& full_track_name,
+ std::variant<SubscribeOkData, MoqtRequestError> response) override;
+
+ void OnCanAckObjects(MoqtObjectAckFunction ack_function) override {
+ object_ack_function_ = std::move(ack_function);
+ }
+
+ void OnObjectFragment(const FullTrackName& full_track_name,
+ const PublishedObjectMetadata& metadata,
+ absl::string_view object, bool end_of_message) override;
+
+ void OnPublishDone(FullTrackName /*full_track_name*/) override {}
+ void OnMalformedTrack(const FullTrackName& /*full_track_name*/) override {}
+ void OnStreamFin(const FullTrackName&, DataStreamIndex) override {}
+ void OnStreamReset(const FullTrackName&, DataStreamIndex) override {}
+
+ void OnFullObject(Location sequence, absl::string_view payload);
+
+ size_t full_objects_received() const { return full_objects_received_; }
+ size_t full_objects_received_on_time() const {
+ return full_objects_received_on_time_;
+ }
+ size_t full_objects_received_late() const {
+ return full_objects_received_late_;
+ }
+ size_t total_bytes_received_on_time() const {
+ return total_bytes_received_on_time_;
+ }
+
+ private:
+ const quic::QuicClock* clock_ = nullptr;
+ // TODO: figure out when partial objects should be discarded.
+ absl::flat_hash_map<Location, std::string> partial_objects_;
+ MoqtObjectAckFunction object_ack_function_ = nullptr;
+
+ size_t full_objects_received_ = 0;
+
+ quic::QuicTimeDelta deadline_;
+ size_t full_objects_received_on_time_ = 0;
+ size_t full_objects_received_late_ = 0;
+ size_t total_bytes_received_on_time_ = 0;
+};
+
+// Simulates the performance of MoQT transfer under the specified network
+// conditions.
+class MoqtSimulator {
+ public:
+ explicit MoqtSimulator(const SimulationParameters& parameters);
+
+ MoqtSession* client_session() { return client_endpoint_.session(); }
+ MoqtSession* server_session() { return server_endpoint_.session(); }
+
+ std::string GetClientSessionCongestionControl();
+
+ // Runs the simulation and outputs the results to stdout.
+ void Run();
+
+ void HumanReadableOutput();
+ void CustomOutput(absl::string_view format);
+
+ private:
+ quic::simulator::Simulator simulator_;
+ MoqtClientEndpoint client_endpoint_;
+ MoqtServerEndpoint server_endpoint_;
+ quic::simulator::Switch switch_;
+ ModificationBox modification_box_;
+ quic::simulator::SymmetricLink client_link_;
+ quic::simulator::SymmetricLink server_link_;
+ MoqtKnownTrackPublisher publisher_;
+ ObjectGenerator generator_;
+ ObjectReceiver receiver_;
+ MoqtBitrateAdjuster adjuster_;
+ SimulationParameters parameters_;
+
+ absl::Duration wait_at_the_end_;
+};
+
+} // namespace moqt::test
+
+#endif // QUICHE_QUIC_MOQT_TEST_TOOLS_MOQT_SIMULATOR_H_
diff --git a/quiche/quic/moqt/tools/moqt_simulator_bin.cc b/quiche/quic/moqt/tools/moqt_simulator_bin.cc
index 059b59d..8bf8874 100644
--- a/quiche/quic/moqt/tools/moqt_simulator_bin.cc
+++ b/quiche/quic/moqt/tools/moqt_simulator_bin.cc
@@ -5,540 +5,27 @@
// moqt_simulator simulates the behavior of MoQ Transport under various network
// conditions and application settings.
-#include <cmath>
-#include <cstddef>
#include <cstdint>
-#include <cstring>
#include <iostream>
-#include <memory>
-#include <optional>
#include <string>
-#include <utility>
-#include <variant>
-#include <vector>
-#include "absl/algorithm/container.h"
-#include "absl/base/casts.h"
-#include "absl/container/flat_hash_map.h"
#include "absl/strings/ascii.h"
-#include "absl/strings/str_cat.h"
-#include "absl/strings/str_format.h"
-#include "absl/strings/str_join.h"
-#include "absl/strings/str_replace.h"
#include "absl/strings/string_view.h"
#include "absl/time/time.h"
-#include "quiche/quic/core/crypto/quic_random.h"
#include "quiche/quic/core/quic_bandwidth.h"
-#include "quiche/quic/core/quic_clock.h"
#include "quiche/quic/core/quic_time.h"
#include "quiche/quic/core/quic_types.h"
-#include "quiche/quic/moqt/moqt_bitrate_adjuster.h"
-#include "quiche/quic/moqt/moqt_known_track_publisher.h"
-#include "quiche/quic/moqt/moqt_messages.h"
-#include "quiche/quic/moqt/moqt_outgoing_queue.h"
#include "quiche/quic/moqt/moqt_priority.h"
-#include "quiche/quic/moqt/moqt_session.h"
-#include "quiche/quic/moqt/moqt_session_interface.h"
-#include "quiche/quic/moqt/moqt_trace_recorder.h"
-#include "quiche/quic/moqt/test_tools/moqt_simulator_harness.h"
-#include "quiche/quic/test_tools/simulator/actor.h"
-#include "quiche/quic/test_tools/simulator/link.h"
-#include "quiche/quic/test_tools/simulator/port.h"
-#include "quiche/quic/test_tools/simulator/simulator.h"
-#include "quiche/quic/test_tools/simulator/switch.h"
+#include "quiche/quic/moqt/test_tools/moqt_simulator.h"
#include "quiche/common/platform/api/quiche_command_line_flags.h"
-#include "quiche/common/platform/api/quiche_logging.h"
-#include "quiche/common/quiche_buffer_allocator.h"
-#include "quiche/common/quiche_data_reader.h"
-#include "quiche/common/quiche_data_writer.h"
-#include "quiche/common/quiche_mem_slice.h"
-#include "quiche/common/simple_buffer_allocator.h"
-namespace moqt::test {
namespace {
-using ::quiche::QuicheBuffer;
-using ::quiche::QuicheMemSlice;
-
using ::quic::QuicBandwidth;
using ::quic::QuicByteCount;
-using ::quic::QuicClock;
-using ::quic::QuicTime;
using ::quic::QuicTimeDelta;
-using ::quic::simulator::Endpoint;
-using ::quic::simulator::Simulator;
-
-// In the simulation, the server link is supposed to be the bottleneck, so this
-// value just has to be sufficiently larger than the server link bandwidth.
-constexpr QuicBandwidth kClientLinkBandwidth =
- QuicBandwidth::FromBitsPerSecond(10.0e6);
-constexpr MoqtVersion kMoqtVersion = kDefaultMoqtVersion;
-
-// Track name used by the simulator.
-FullTrackName TrackName() { return FullTrackName("test", "track"); }
-
-// Parameters describing the scenario being simulated.
-struct SimulationParameters {
- // Bottleneck bandwidth of the simulated scenario.
- QuicBandwidth bandwidth = QuicBandwidth::FromBitsPerSecond(2.0e6);
- // Intended RTT (as computed from propagation delay alone) between the client
- // and the server.
- QuicTimeDelta min_rtt = QuicTimeDelta::FromMilliseconds(20);
- // The size of the network queue; if zero, assumed to be twice the BDP.
- QuicByteCount network_queue_size = 0;
- // Duration for which the simulation is run.
- QuicTimeDelta duration = QuicTimeDelta::FromSeconds(60);
- // Packet aggregation timeout. If zero, this will be set to the quarter of
- // min RTT.
- QuicTimeDelta aggregation_timeout = QuicTimeDelta::Zero();
- // Packet aggregation threshold. If zero, packet aggregation is disabled.
- QuicByteCount aggregation_threshold = 0;
-
- // Count frames as useful only if they were received `deadline` after which
- // they were generated.
- QuicTimeDelta deadline = QuicTimeDelta::FromSeconds(2);
- // Delivery order used by the publisher.
- MoqtDeliveryOrder delivery_order = MoqtDeliveryOrder::kDescending;
- // Delivery timeout for the subscription. This is mechanically independent
- // from `deadline`, which is an accounting-only parameter (in practice, those
- // should probably be close).
- QuicTimeDelta delivery_timeout = QuicTimeDelta::Infinite();
- // Whether MoqtBitrateAdjuster is enabled.
- bool bitrate_adaptation = true;
- // Use alternative delivery timeout design.
- bool alternative_timeout = false;
-
- // Number of frames in an individual group.
- int keyframe_interval = 30 * 2;
- // Number of frames generated per second.
- int fps = 30;
- // The ratio by which an I-frame is bigger than a P-frame.
- float i_to_p_ratio = 2 / 1;
- // The target bitrate of the data being exchanged.
- QuicBandwidth bitrate = QuicBandwidth::FromBitsPerSecond(1.0e6);
-
- // Adds random packet loss rate, as a fraction.
- float packet_loss_rate = 0.0f;
-
- // If non-zero, makes the traffic disappear in the middle of the connection
- // for the specified duration.
- quic::QuicTimeDelta blackhole_duration = QuicTimeDelta::Zero();
-};
-
-std::string FormatPercentage(size_t n, size_t total) {
- float percentage = 100.0f * n / total;
- return absl::StrFormat("%d / %d (%.2f%%)", n, total, percentage);
-}
-
-using OutputField = std::pair<absl::string_view, std::string>;
-
-OutputField OutputFraction(absl::string_view key, size_t n, size_t total) {
- float fraction = static_cast<float>(n) / total;
- return OutputField(key, absl::StrCat(fraction));
-}
-
-float RandFloat(quic::QuicRandom& rng) {
- uint32_t number;
- rng.RandBytes(&number, sizeof(number));
- return absl::bit_cast<float>((number & 0x7fffff) | 0x3f800000) - 1.0f;
-}
-
-// Box that enacts MoQT simulator specific modifications to the traffic.
-class ModificationBox : public Endpoint,
- public quic::simulator::UnconstrainedPortInterface {
- public:
- ModificationBox(Endpoint* wrapped_endpoint,
- const SimulationParameters& parameters)
- : Endpoint(
- wrapped_endpoint->simulator(),
- absl::StrCat(wrapped_endpoint->name(), " (moedification box)")),
- wrapped_endpoint_(*wrapped_endpoint),
- parameters_(parameters) {}
-
- void OnBeforeSimulationStart() {
- if (!parameters_.blackhole_duration.IsZero()) {
- float offset =
- 0.5f + RandFloat(*simulator()->GetRandomGenerator()) * 0.2f;
- blackhole_start_time_ =
- simulator()->GetClock()->Now() + offset * parameters_.duration;
- }
- }
-
- // Endpoint implementation.
- void Act() override {}
- quic::simulator::UnconstrainedPortInterface* GetRxPort() override {
- return this;
- }
- void SetTxPort(quic::simulator::ConstrainedPortInterface* port) override {
- return wrapped_endpoint_.SetTxPort(port);
- }
-
- // UnconstrainedPortInterface implementation.
- void AcceptPacket(std::unique_ptr<quic::simulator::Packet> packet) {
- quic::QuicRandom* const rng = simulator()->GetRandomGenerator();
- const quic::QuicTime now = simulator()->GetClock()->Now();
- bool drop = false;
- if (parameters_.packet_loss_rate > 0) {
- if (RandFloat(*rng) < parameters_.packet_loss_rate) {
- drop = true;
- }
- }
- if (blackhole_start_time_.has_value()) {
- quic::QuicTime blackhole_end_time =
- *blackhole_start_time_ + parameters_.blackhole_duration;
- if (now >= blackhole_start_time_ && now < blackhole_end_time) {
- drop = true;
- }
- }
- if (!drop) {
- wrapped_endpoint_.GetRxPort()->AcceptPacket(std::move(packet));
- }
- }
-
- private:
- Endpoint& wrapped_endpoint_;
- SimulationParameters parameters_;
- std::optional<QuicTime> blackhole_start_time_;
-};
-
-// Generates test objects at a constant rate. The first eight bytes of every
-// object generated is a timestamp, the rest is all zeroes. The first object in
-// the group can be made bigger than the rest, to simulate the profile of real
-// video bitstreams.
-class ObjectGenerator : public quic::simulator::Actor,
- public moqt::BitrateAdjustable {
- public:
- ObjectGenerator(Simulator* simulator, const std::string& actor_name,
- MoqtSession* session, FullTrackName track_name,
- int keyframe_interval, int fps, float i_to_p_ratio,
- QuicBandwidth bitrate)
- : Actor(simulator, actor_name),
- queue_(std::make_shared<MoqtOutgoingQueue>(
- track_name, MoqtForwardingPreference::kSubgroup,
- simulator->GetClock())),
- keyframe_interval_(keyframe_interval),
- time_between_frames_(QuicTimeDelta::FromMicroseconds(1.0e6 / fps)),
- i_to_p_ratio_(i_to_p_ratio),
- bitrate_(bitrate),
- bitrate_history_({bitrate}) {}
-
- void Act() override {
- ++frame_number_;
- bool i_frame = (frame_number_ % keyframe_interval_) == 0;
- size_t size = GetFrameSize(i_frame);
-
- QuicheBuffer buffer(quiche::SimpleBufferAllocator::Get(), size);
- memset(buffer.data(), 0, buffer.size());
- quiche::QuicheDataWriter writer(size, buffer.data());
- bool success = writer.WriteUInt64(clock_->Now().ToDebuggingValue());
- QUICHE_CHECK(success);
-
- queue_->AddObject(QuicheMemSlice(std::move(buffer)), i_frame);
- Schedule(clock_->Now() + time_between_frames_);
- }
-
- void Start() { Schedule(clock_->Now()); }
- void Stop() { Unschedule(); }
-
- std::shared_ptr<MoqtOutgoingQueue> queue() { return queue_; }
- size_t total_objects_sent() const { return frame_number_ + 1; }
-
- size_t GetFrameSize(bool i_frame) const {
- int p_frame_count = keyframe_interval_ - 1;
- // Compute the frame sizes as a fraction of the total group size.
- float i_frame_fraction = i_to_p_ratio_ / (i_to_p_ratio_ + p_frame_count);
- float p_frame_fraction = 1.0 / (i_to_p_ratio_ + p_frame_count);
- float frame_fraction = i_frame ? i_frame_fraction : p_frame_fraction;
-
- QuicTimeDelta group_duration = time_between_frames_ * keyframe_interval_;
- QuicByteCount group_byte_count = group_duration * bitrate_;
- size_t frame_size = std::ceil(frame_fraction * group_byte_count);
- QUICHE_CHECK_GE(frame_size, 8u)
- << "Frame size is too small for a timestamp";
- return frame_size;
- }
-
- quic::QuicBandwidth GetCurrentBitrate() const override { return bitrate_; }
- bool CouldUseExtraBandwidth() override { return true; }
- void ConsiderAdjustingBitrate(quic::QuicBandwidth bandwidth,
- BitrateAdjustmentType type) override {
- if (moqt::ShouldIgnoreBitrateAdjustment(bandwidth, type, bitrate_,
- /*min_change=*/0.01)) {
- return;
- }
- bitrate_ = bandwidth;
- bitrate_history_.push_back(bandwidth);
- }
- std::string FormatBitrateHistory() const {
- std::vector<std::string> bits;
- bits.reserve(bitrate_history_.size());
- for (QuicBandwidth bandwidth : bitrate_history_) {
- bits.push_back(absl::StrCat(bandwidth));
- }
- return absl::StrJoin(bits, " -> ");
- }
-
- private:
- std::shared_ptr<MoqtOutgoingQueue> queue_;
- int keyframe_interval_;
- QuicTimeDelta time_between_frames_;
- float i_to_p_ratio_;
- QuicBandwidth bitrate_;
- int frame_number_ = -1;
- std::vector<QuicBandwidth> bitrate_history_;
-};
-
-class ObjectReceiver : public SubscribeVisitor {
- public:
- explicit ObjectReceiver(const QuicClock* clock, QuicTimeDelta deadline)
- : clock_(clock), deadline_(deadline) {}
-
- void OnReply(
- const FullTrackName& full_track_name,
- std::variant<SubscribeOkData, MoqtRequestError> response) override {
- QUICHE_CHECK(full_track_name == TrackName());
- if (std::holds_alternative<MoqtRequestError>(response)) {
- MoqtRequestError error = std::get<MoqtRequestError>(response);
- QUICHE_CHECK(!error.reason_phrase.empty()) << error.reason_phrase;
- }
- }
-
- void OnCanAckObjects(MoqtObjectAckFunction ack_function) override {
- object_ack_function_ = std::move(ack_function);
- }
-
- void OnObjectFragment(const FullTrackName& full_track_name,
- const PublishedObjectMetadata& metadata,
- absl::string_view object,
- bool end_of_message) override {
- QUICHE_DCHECK(full_track_name == TrackName());
- if (metadata.status != MoqtObjectStatus::kNormal) {
- QUICHE_DCHECK(end_of_message);
- return;
- }
- if (!end_of_message) {
- QUICHE_LOG(DFATAL) << "Partial receiving of objects wasn't enabled";
- return;
- }
- OnFullObject(metadata.location, object);
- }
-
- void OnPublishDone(FullTrackName /*full_track_name*/) override {}
- void OnMalformedTrack(const FullTrackName& /*full_track_name*/) override {}
- void OnStreamFin(const FullTrackName&, DataStreamIndex) override {}
- void OnStreamReset(const FullTrackName&, DataStreamIndex) override {}
-
- void OnFullObject(Location sequence, absl::string_view payload) {
- QUICHE_CHECK_GE(payload.size(), 8u);
- quiche::QuicheDataReader reader(payload);
- uint64_t time_us;
- reader.ReadUInt64(&time_us);
- QuicTime time = QuicTime::Zero() + QuicTimeDelta::FromMicroseconds(time_us);
- QuicTimeDelta delay = clock_->Now() - time;
- QUICHE_CHECK_GT(delay, QuicTimeDelta::Zero());
- QUICHE_DCHECK(absl::c_all_of(reader.ReadRemainingPayload(),
- [](char c) { return c == 0; }));
- ++full_objects_received_;
- if (delay > deadline_) {
- ++full_objects_received_late_;
- } else {
- ++full_objects_received_on_time_;
- total_bytes_received_on_time_ += payload.size();
- }
- if (object_ack_function_) {
- object_ack_function_(sequence.group, sequence.object, deadline_ - delay);
- }
- }
-
- size_t full_objects_received() const { return full_objects_received_; }
- size_t full_objects_received_on_time() const {
- return full_objects_received_on_time_;
- }
- size_t full_objects_received_late() const {
- return full_objects_received_late_;
- }
- size_t total_bytes_received_on_time() const {
- return total_bytes_received_on_time_;
- }
-
- private:
- const QuicClock* clock_ = nullptr;
- // TODO: figure out when partial objects should be discarded.
- absl::flat_hash_map<Location, std::string> partial_objects_;
- MoqtObjectAckFunction object_ack_function_ = nullptr;
-
- size_t full_objects_received_ = 0;
-
- QuicTimeDelta deadline_;
- size_t full_objects_received_on_time_ = 0;
- size_t full_objects_received_late_ = 0;
- size_t total_bytes_received_on_time_ = 0;
-};
-
-// Computes the size of the network queue on the switch.
-constexpr QuicByteCount AdjustedQueueSize(
- const SimulationParameters& parameters) {
- if (parameters.network_queue_size > 0) {
- return parameters.network_queue_size;
- }
- QuicByteCount bdp = parameters.bandwidth * parameters.min_rtt;
- return 2 * bdp;
-}
-
-// Simulates the performance of MoQT transfer under the specified network
-// conditions.
-class MoqtSimulator {
- public:
- explicit MoqtSimulator(const SimulationParameters& parameters)
- : simulator_(quic::QuicRandom::GetInstance()),
- client_endpoint_(&simulator_, "Client", "Server", kMoqtVersion),
- server_endpoint_(&simulator_, "Server", "Client", kMoqtVersion),
- switch_(&simulator_, "Switch", 8, AdjustedQueueSize(parameters)),
- modification_box_(switch_.port(1), parameters),
- client_link_(&client_endpoint_, &modification_box_,
- kClientLinkBandwidth, parameters.min_rtt * 0.25),
- server_link_(&server_endpoint_, switch_.port(2), parameters.bandwidth,
- parameters.min_rtt * 0.25),
- generator_(&simulator_, "Client generator", client_endpoint_.session(),
- TrackName(), parameters.keyframe_interval, parameters.fps,
- parameters.i_to_p_ratio, parameters.bitrate),
- receiver_(simulator_.GetClock(), parameters.deadline),
- adjuster_(simulator_.GetClock(), client_endpoint_.session()->session(),
- &generator_),
- parameters_(parameters) {
- if (parameters.aggregation_threshold > 0) {
- QuicTimeDelta timeout = parameters.aggregation_timeout;
- if (timeout.IsZero()) {
- timeout = parameters.min_rtt * 0.25;
- }
- switch_.port_queue(2)->EnableAggregation(parameters.aggregation_threshold,
- timeout);
- }
- client_endpoint_.RecordTrace();
- QUICHE_DCHECK(client_endpoint_.trace_visitor() != nullptr);
- client_endpoint_.session()->trace_recorder().set_trace(
- client_endpoint_.trace_visitor()->trace());
- }
-
- MoqtSession* client_session() { return client_endpoint_.session(); }
- MoqtSession* server_session() { return server_endpoint_.session(); }
-
- std::string GetClientSessionCongestionControl() {
- return quic::CongestionControlTypeToString(
- client_endpoint_.quic_session()
- ->connection()
- ->sent_packet_manager()
- .GetSendAlgorithm()
- ->GetCongestionControlType());
- }
-
- // Runs the simulation and outputs the results to stdout.
- void Run() {
- // Perform the QUIC and the MoQT handshake.
- client_session()->set_support_object_acks(true);
- server_session()->set_support_object_acks(true);
- RunHandshakeOrDie(simulator_, client_endpoint_, server_endpoint_);
-
- generator_.queue()->SetDeliveryOrder(parameters_.delivery_order);
- client_session()->set_publisher(&publisher_);
- if (parameters_.bitrate_adaptation) {
- client_session()->SetMonitoringInterfaceForTrack(TrackName(), &adjuster_);
- }
- if (parameters_.alternative_timeout) {
- client_session()->UseAlternateDeliveryTimeout();
- }
- publisher_.Add(generator_.queue());
- modification_box_.OnBeforeSimulationStart();
-
- // The simulation is started as follows. At t=0:
- // (1) The server issues a subscribe request.
- // (2) The client starts immediately generating data. At this point, the
- // server does not yet have an active subscription, so the client has
- // some catching up to do.
- generator_.Start();
- VersionSpecificParameters subscription_parameters;
- if (parameters_.bitrate_adaptation) {
- subscription_parameters.oack_window_size = parameters_.deadline;
- }
- if (!parameters_.delivery_timeout.IsInfinite()) {
- subscription_parameters.delivery_timeout = parameters_.delivery_timeout;
- }
- server_session()->RelativeJoiningFetch(TrackName(), &receiver_, 0,
- subscription_parameters);
- simulator_.RunFor(parameters_.duration);
-
- // At the end, we wait for eight RTTs until the connection settles down.
- generator_.Stop();
- wait_at_the_end_ =
- 8 * client_endpoint_.quic_session()->GetSessionStats().smoothed_rtt;
- simulator_.RunFor(QuicTimeDelta(wait_at_the_end_));
- }
-
- void HumanReadableOutput() {
- const QuicTimeDelta total_time =
- parameters_.duration + QuicTimeDelta(wait_at_the_end_);
- absl::PrintF("Ran simulation for %v + %.1fms\n", parameters_.duration,
- absl::ToDoubleMilliseconds(wait_at_the_end_));
- absl::PrintF("Congestion control used: %s\n",
- GetClientSessionCongestionControl());
-
- size_t total_sent = generator_.total_objects_sent();
- size_t missing_objects =
- generator_.total_objects_sent() - receiver_.full_objects_received();
- absl::PrintF(
- "Objects received: %s\n",
- FormatPercentage(receiver_.full_objects_received(), total_sent));
- absl::PrintF(" on time: %s\n",
- FormatPercentage(receiver_.full_objects_received_on_time(),
- total_sent));
- absl::PrintF(
- " late: %s\n",
- FormatPercentage(receiver_.full_objects_received_late(), total_sent));
- absl::PrintF(" never: %s\n",
- FormatPercentage(missing_objects, total_sent));
- absl::PrintF("\n");
- absl::PrintF("Average on-time goodput: %v\n",
- QuicBandwidth::FromBytesAndTimeDelta(
- receiver_.total_bytes_received_on_time(), total_time));
- absl::PrintF("Bitrates: %s\n", generator_.FormatBitrateHistory());
- }
-
- void CustomOutput(absl::string_view format) {
- size_t total_sent = generator_.total_objects_sent();
- std::vector<OutputField> fields;
- fields.push_back(OutputFraction("{on_time_fraction}",
- receiver_.full_objects_received_on_time(),
- total_sent));
- fields.push_back(OutputFraction(
- "{late_fraction}", receiver_.full_objects_received_late(), total_sent));
- size_t missing_objects =
- generator_.total_objects_sent() - receiver_.full_objects_received();
- fields.push_back(
- OutputFraction("{missing_fraction}", missing_objects, total_sent));
- std::string output = absl::StrReplaceAll(format, fields);
- std::cout << output << std::endl;
- }
-
- private:
- Simulator simulator_;
- MoqtClientEndpoint client_endpoint_;
- MoqtServerEndpoint server_endpoint_;
- quic::simulator::Switch switch_;
- ModificationBox modification_box_;
- quic::simulator::SymmetricLink client_link_;
- quic::simulator::SymmetricLink server_link_;
- MoqtKnownTrackPublisher publisher_;
- ObjectGenerator generator_;
- ObjectReceiver receiver_;
- MoqtBitrateAdjuster adjuster_;
- SimulationParameters parameters_;
-
- absl::Duration wait_at_the_end_;
-};
-
} // namespace
-} // namespace moqt::test
DEFINE_QUICHE_COMMAND_LINE_FLAG(
uint64_t, bandwidth,