Implement a very basic MoQT simulator. This should help us tune MoQT performance by having a simulated lab setup that lets us iterate easily. This CL also fixes a bunch of code that spams irrelevant messages into logs when running the simulator. PiperOrigin-RevId: 642365065
diff --git a/build/source_list.bzl b/build/source_list.bzl index e4bfad3..bbe3b4c 100644 --- a/build/source_list.bzl +++ b/build/source_list.bzl
@@ -1534,6 +1534,7 @@ "quic/moqt/tools/moqt_end_to_end_test.cc", "quic/moqt/tools/moqt_ingestion_server_bin.cc", "quic/moqt/tools/moqt_server.cc", + "quic/moqt/tools/moqt_simulator_bin.cc", ] binary_http_hdrs = [ "binary_http/binary_http_message.h",
diff --git a/build/source_list.gni b/build/source_list.gni index 60c5103..3429497 100644 --- a/build/source_list.gni +++ b/build/source_list.gni
@@ -1538,6 +1538,7 @@ "src/quiche/quic/moqt/tools/moqt_end_to_end_test.cc", "src/quiche/quic/moqt/tools/moqt_ingestion_server_bin.cc", "src/quiche/quic/moqt/tools/moqt_server.cc", + "src/quiche/quic/moqt/tools/moqt_simulator_bin.cc", ] binary_http_hdrs = [ "src/quiche/binary_http/binary_http_message.h",
diff --git a/build/source_list.json b/build/source_list.json index b34b8f9..12fec03 100644 --- a/build/source_list.json +++ b/build/source_list.json
@@ -1536,7 +1536,8 @@ "quiche/quic/moqt/tools/moqt_client.cc", "quiche/quic/moqt/tools/moqt_end_to_end_test.cc", "quiche/quic/moqt/tools/moqt_ingestion_server_bin.cc", - "quiche/quic/moqt/tools/moqt_server.cc" + "quiche/quic/moqt/tools/moqt_server.cc", + "quiche/quic/moqt/tools/moqt_simulator_bin.cc" ], "binary_http_hdrs": [ "quiche/binary_http/binary_http_message.h"
diff --git a/quiche/quic/moqt/moqt_messages.h b/quiche/quic/moqt/moqt_messages.h index da72fea..8d68733 100644 --- a/quiche/quic/moqt/moqt_messages.h +++ b/quiche/quic/moqt/moqt_messages.h
@@ -14,6 +14,7 @@ #include <utility> #include <vector> +#include "absl/strings/str_format.h" #include "absl/strings/string_view.h" #include "quiche/quic/core/quic_time.h" #include "quiche/quic/core/quic_types.h" @@ -157,6 +158,11 @@ } template <typename H> friend H AbslHashValue(H h, const FullSequence& m); + + template <typename Sink> + friend void AbslStringify(Sink& sink, const FullSequence& sequence) { + absl::Format(&sink, "(%d; %d)", sequence.group, sequence.object); + } }; template <typename H>
diff --git a/quiche/quic/moqt/moqt_session.cc b/quiche/quic/moqt/moqt_session.cc index 06d2808..bb86c12 100644 --- a/quiche/quic/moqt/moqt_session.cc +++ b/quiche/quic/moqt/moqt_session.cc
@@ -482,11 +482,11 @@ ++failures; continue; } - QUICHE_LOG(INFO) << ENDPOINT << "Sending object length " << payload.length() - << " for " << full_track_name.track_namespace << ":" - << full_track_name.track_name << " with sequence " - << object.group_id << ":" << object.object_id - << " on stream " << *stream_id; + QUICHE_DVLOG(1) << ENDPOINT << "Sending object length " << payload.length() + << " for " << full_track_name.track_namespace << ":" + << full_track_name.track_name << " with sequence " + << object.group_id << ":" << object.object_id + << " on stream " << *stream_id; if (end_of_stream && !new_stream) { subscription->RemoveStream(group_id, object_id); } @@ -567,7 +567,7 @@ "Received OBJECT message on control stream"); return; } - QUICHE_DLOG(INFO) + QUICHE_DVLOG(1) << ENDPOINT << "Received OBJECT message on stream " << stream_->GetStreamId() << " for subscribe_id " << message.subscribe_id << " for track alias " << message.track_alias << " with sequence "
diff --git a/quiche/quic/moqt/tools/moqt_simulator_bin.cc b/quiche/quic/moqt/tools/moqt_simulator_bin.cc new file mode 100644 index 0000000..bf11d97 --- /dev/null +++ b/quiche/quic/moqt/tools/moqt_simulator_bin.cc
@@ -0,0 +1,325 @@ +// Copyright 2024 The Chromium Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +// moqt_simulator simulates the behavior of MoQ Transport under various network +// conditions and application settings. + +#include <cstddef> +#include <cstdint> +#include <cstring> +#include <iostream> +#include <optional> +#include <string> +#include <utility> + +#include "absl/algorithm/container.h" +#include "absl/container/flat_hash_map.h" +#include "absl/strings/str_cat.h" +#include "absl/strings/string_view.h" +#include "quiche/quic/core/crypto/quic_random.h" +#include "quiche/quic/core/quic_bandwidth.h" +#include "quiche/quic/core/quic_clock.h" +#include "quiche/quic/core/quic_time.h" +#include "quiche/quic/core/quic_types.h" +#include "quiche/quic/moqt/moqt_messages.h" +#include "quiche/quic/moqt/moqt_outgoing_queue.h" +#include "quiche/quic/moqt/moqt_session.h" +#include "quiche/quic/moqt/moqt_track.h" +#include "quiche/quic/moqt/test_tools/moqt_simulator_harness.h" +#include "quiche/quic/test_tools/simulator/actor.h" +#include "quiche/quic/test_tools/simulator/link.h" +#include "quiche/quic/test_tools/simulator/simulator.h" +#include "quiche/quic/test_tools/simulator/switch.h" +#include "quiche/common/platform/api/quiche_command_line_flags.h" +#include "quiche/common/platform/api/quiche_logging.h" +#include "quiche/common/platform/api/quiche_mem_slice.h" +#include "quiche/common/quiche_buffer_allocator.h" +#include "quiche/common/quiche_data_reader.h" +#include "quiche/common/quiche_data_writer.h" +#include "quiche/common/simple_buffer_allocator.h" + +namespace moqt::test { +namespace { + +using ::quiche::QuicheBuffer; +using ::quiche::QuicheMemSlice; + +using ::quic::QuicBandwidth; +using ::quic::QuicByteCount; +using ::quic::QuicClock; +using ::quic::QuicTime; +using ::quic::QuicTimeDelta; + +using ::quic::simulator::Simulator; + +// In the simulation, the server link is supposed to be the bottleneck, so this +// value just has to be sufficiently larger than the server link bandwidth. +constexpr QuicBandwidth kClientLinkBandwidth = + QuicBandwidth::FromBitsPerSecond(10.0e6); +constexpr MoqtVersion kMoqtVersion = MoqtVersion::kDraft04; + +// Track name used by the simulator. +FullTrackName TrackName() { return FullTrackName("test", "track"); } + +// Parameters describing the scenario being simulated. +struct SimulationParameters { + // Bottleneck bandwidth of the simulated scenario. + QuicBandwidth bandwidth = QuicBandwidth::FromBitsPerSecond(2.0e6); + // Intended RTT (as computed from propagation delay alone) between the client + // and the server. + QuicTimeDelta min_rtt = QuicTimeDelta::FromMilliseconds(20); + // The size of the network queue; if zero, assumed to be twice the BDP. + QuicByteCount network_queue_size = 0; + // Duration for which the simulation is run. + QuicTimeDelta duration = QuicTimeDelta::FromSeconds(60); + + // Number of frames in an individual group. + int keyframe_interval = 30 * 2; + // Number of frames generated per second. + int fps = 30; + // The ratio by which an I-frame is bigger than a P-frame. + float i_to_p_ratio = 2 / 1; + // The target bitrate of the data being exchanged. + QuicBandwidth bitrate = QuicBandwidth::FromBitsPerSecond(1.0e6); +}; + +// Generates test objects at a constant rate. The first eight bytes of every +// object generated is a timestamp, the rest is all zeroes. The first object in +// the group can be made bigger than the rest, to simulate the profile of real +// video bitstreams. +class ObjectGenerator : public quic::simulator::Actor { + public: + ObjectGenerator(Simulator* simulator, const std::string& actor_name, + MoqtSession* session, FullTrackName track_name, + int keyframe_interval, int fps, float i_to_p_ratio, + QuicBandwidth bitrate) + : Actor(simulator, actor_name), + queue_(session, track_name), + keyframe_interval_(keyframe_interval), + time_between_frames_(QuicTimeDelta::FromMicroseconds(1.0e6 / fps)) { + int p_frame_count = keyframe_interval - 1; + // Compute the frame sizes as a fraction of the total group size. + float i_frame_fraction = i_to_p_ratio / (i_to_p_ratio + p_frame_count); + float p_frame_fraction = 1.0 / (i_to_p_ratio + p_frame_count); + + QuicTimeDelta group_duration = + QuicTimeDelta::FromMicroseconds(1.0e6 * keyframe_interval / fps); + QuicByteCount group_byte_count = group_duration * bitrate; + i_frame_size_ = i_frame_fraction * group_byte_count; + p_frame_size_ = p_frame_fraction * group_byte_count; + QUICHE_CHECK_GE(i_frame_size_, 8u) << "Not enough space for a timestamp"; + QUICHE_CHECK_GE(p_frame_size_, 8u) << "Not enough space for a timestamp"; + } + + void Act() override { + ++frame_number_; + bool i_frame = (frame_number_ % keyframe_interval_) == 0; + size_t size = i_frame ? i_frame_size_ : p_frame_size_; + + QuicheBuffer buffer(quiche::SimpleBufferAllocator::Get(), size); + memset(buffer.data(), 0, buffer.size()); + quiche::QuicheDataWriter writer(size, buffer.data()); + bool success = writer.WriteUInt64(clock_->Now().ToDebuggingValue()); + QUICHE_CHECK(success); + + queue_.AddObject(QuicheMemSlice(std::move(buffer)), i_frame); + Schedule(clock_->Now() + time_between_frames_); + } + + void Start() { Schedule(clock_->Now()); } + void Stop() { Unschedule(); } + + MoqtOutgoingQueue& queue() { return queue_; } + size_t total_objects_sent() const { return frame_number_ + 1; } + + private: + MoqtOutgoingQueue queue_; + int keyframe_interval_; + QuicTimeDelta time_between_frames_; + QuicByteCount i_frame_size_; + QuicByteCount p_frame_size_; + int frame_number_ = -1; +}; + +class ObjectReceiver : public RemoteTrack::Visitor { + public: + explicit ObjectReceiver(const QuicClock* clock) : clock_(clock) {} + + void OnReply(const FullTrackName& full_track_name, + std::optional<absl::string_view> error_reason_phrase) override { + QUICHE_CHECK(full_track_name == TrackName()); + QUICHE_CHECK(!error_reason_phrase.has_value()) << *error_reason_phrase; + } + + void OnObjectFragment(const FullTrackName& full_track_name, + uint64_t group_sequence, uint64_t object_sequence, + uint64_t /*object_send_order*/, + MoqtForwardingPreference /*forwarding_preference*/, + absl::string_view object, + bool end_of_message) override { + QUICHE_DCHECK(full_track_name == TrackName()); + + // Buffer and assemble partially available objects. + // TODO: this logic should be factored out. Also, this should take advantage + // of the fact that in the current MoQT, the object size is known in + // advance. + FullSequence sequence{group_sequence, object_sequence}; + if (!end_of_message) { + auto [it, unused] = partial_objects_.try_emplace(sequence); + it->second.append(object); + return; + } + auto it = partial_objects_.find(sequence); + if (it == partial_objects_.end()) { + OnFullObject(sequence, object); + return; + } + std::string full_object = std::move(it->second); + full_object.append(object); + partial_objects_.erase(it); + OnFullObject(sequence, full_object); + } + + void OnFullObject(FullSequence sequence, absl::string_view payload) { + QUICHE_CHECK_GE(payload.size(), 8u); + quiche::QuicheDataReader reader(payload); + uint64_t time_us; + reader.ReadUInt64(&time_us); + QuicTime time = QuicTime::Zero() + QuicTimeDelta::FromMicroseconds(time_us); + QuicTimeDelta delay = clock_->Now() - time; + QUICHE_CHECK_GT(delay, QuicTimeDelta::Zero()); + QUICHE_DCHECK(absl::c_all_of(reader.ReadRemainingPayload(), + [](char c) { return c == 0; })); + ++full_objects_received_; + } + + size_t full_objects_received() const { return full_objects_received_; } + + private: + const QuicClock* clock_ = nullptr; + // TODO: figure out when partial objects should be discarded. + absl::flat_hash_map<FullSequence, std::string> partial_objects_; + + size_t full_objects_received_ = 0; +}; + +// Computes the size of the network queue on the switch. +constexpr QuicByteCount AdjustedQueueSize( + const SimulationParameters& parameters) { + if (parameters.network_queue_size > 0) { + return parameters.network_queue_size; + } + QuicByteCount bdp = parameters.bandwidth * parameters.min_rtt; + return 2 * bdp; +} + +// Simulates the performance of MoQT transfer under the specified network +// conditions. +class MoqtSimulator { + public: + explicit MoqtSimulator(const SimulationParameters& parameters) + : simulator_(quic::QuicRandom::GetInstance()), + client_endpoint_(&simulator_, "Client", "Server", kMoqtVersion), + server_endpoint_(&simulator_, "Server", "Client", kMoqtVersion), + switch_(&simulator_, "Switch", 8, AdjustedQueueSize(parameters)), + client_link_(&client_endpoint_, switch_.port(1), kClientLinkBandwidth, + parameters.min_rtt * 0.25), + server_link_(&server_endpoint_, switch_.port(2), parameters.bandwidth, + parameters.min_rtt * 0.25), + generator_(&simulator_, "Client generator", client_endpoint_.session(), + TrackName(), parameters.keyframe_interval, parameters.fps, + parameters.i_to_p_ratio, parameters.bitrate), + receiver_(simulator_.GetClock()), + parameters_(parameters) {} + + MoqtSession* client_session() { return client_endpoint_.session(); } + MoqtSession* server_session() { return server_endpoint_.session(); } + + std::string GetClientSessionCongestionControl() { + return quic::CongestionControlTypeToString( + client_endpoint_.quic_session() + ->connection() + ->sent_packet_manager() + .GetSendAlgorithm() + ->GetCongestionControlType()); + } + + // Runs the simulation and outputs the results to stdout. + void Run() { + // Timeout for establishing the connection. + constexpr QuicTimeDelta kConnectionTimeout = QuicTimeDelta::FromSeconds(1); + + // Perform the QUIC and the MoQT handshake. + client_session()->callbacks().session_established_callback = [this] { + client_established_ = true; + }; + server_session()->callbacks().session_established_callback = [this] { + server_established_ = true; + }; + client_endpoint_.quic_session()->CryptoConnect(); + simulator_.RunUntilOrTimeout( + [&]() { return client_established_ && server_established_; }, + kConnectionTimeout); + QUICHE_CHECK(client_established_) << "Client failed to establish session"; + QUICHE_CHECK(server_established_) << "Server failed to establish session"; + + // The simulation is started as follows. At t=0: + // (1) The server issues a subscribe request. + // (2) The client starts immediately generating data. At this point, the + // server does not yet have an active subscription, so the client has + // some catching up to do. + client_session()->AddLocalTrack( + TrackName(), MoqtForwardingPreference::kGroup, &generator_.queue()); + generator_.Start(); + server_session()->SubscribeCurrentGroup(TrackName().track_namespace, + TrackName().track_name, &receiver_); + simulator_.RunFor(parameters_.duration); + + // At the end, we wait for eight RTTs until the connection settles down. + generator_.Stop(); + simulator_.RunFor(QuicTimeDelta( + 8 * client_endpoint_.quic_session()->GetSessionStats().smoothed_rtt)); + + std::cout << "Ran simulation for " << parameters_.duration << std::endl; + std::cout << "Congestion control used : " + << GetClientSessionCongestionControl() << std::endl; + std::cout << "Objects sent: " << generator_.total_objects_sent() + << std::endl; + std::cout << "Objects received: " << receiver_.full_objects_received() + << std::endl; + } + + private: + Simulator simulator_; + MoqtClientEndpoint client_endpoint_; + MoqtServerEndpoint server_endpoint_; + quic::simulator::Switch switch_; + quic::simulator::SymmetricLink client_link_; + quic::simulator::SymmetricLink server_link_; + ObjectGenerator generator_; + ObjectReceiver receiver_; + SimulationParameters parameters_; + + bool client_established_ = false; + bool server_established_ = false; +}; + +} // namespace +} // namespace moqt::test + +DEFINE_QUICHE_COMMAND_LINE_FLAG( + uint64_t, bandwidth, + moqt::test::SimulationParameters().bandwidth.ToKBitsPerSecond(), + "Bandwidth of the simulated link, in kilobits per second."); + +int main(int argc, char** argv) { + moqt::test::SimulationParameters parameters; + quiche::QuicheParseCommandLineFlags("moqt_simulator", argc, argv); + parameters.bandwidth = quic::QuicBandwidth::FromKBitsPerSecond( + quiche::GetQuicheCommandLineFlag(FLAGS_bandwidth)); + + moqt::test::MoqtSimulator simulator(parameters); + simulator.Run(); + return 0; +}
diff --git a/quiche/quic/test_tools/simulator/quic_endpoint_base.cc b/quiche/quic/test_tools/simulator/quic_endpoint_base.cc index 78d285b..42d5980 100644 --- a/quiche/quic/test_tools/simulator/quic_endpoint_base.cc +++ b/quiche/quic/test_tools/simulator/quic_endpoint_base.cc
@@ -14,6 +14,7 @@ #include "quiche/quic/core/crypto/crypto_handshake_message.h" #include "quiche/quic/core/crypto/crypto_protocol.h" #include "quiche/quic/core/quic_connection.h" +#include "quiche/quic/core/quic_connection_id.h" #include "quiche/quic/core/quic_data_writer.h" #include "quiche/quic/platform/api/quic_test_output.h" #include "quiche/quic/test_tools/quic_connection_peer.h" @@ -65,7 +66,8 @@ kMaxOutgoingPacketSize * kTxQueueSize), connection_(nullptr), write_blocked_count_(0), - drop_next_packet_(false) { + drop_next_packet_(false), + connection_id_generator_(kQuicDefaultConnectionIdLength) { nic_tx_queue_.set_listener_interface(this); }
diff --git a/quiche/quic/test_tools/simulator/quic_endpoint_base.h b/quiche/quic/test_tools/simulator/quic_endpoint_base.h index fe753d5..336b2f9 100644 --- a/quiche/quic/test_tools/simulator/quic_endpoint_base.h +++ b/quiche/quic/test_tools/simulator/quic_endpoint_base.h
@@ -10,12 +10,12 @@ #include "absl/container/flat_hash_map.h" #include "quiche/quic/core/crypto/null_decrypter.h" #include "quiche/quic/core/crypto/null_encrypter.h" +#include "quiche/quic/core/deterministic_connection_id_generator.h" #include "quiche/quic/core/quic_connection.h" #include "quiche/quic/core/quic_packet_writer.h" #include "quiche/quic/core/quic_packets.h" #include "quiche/quic/core/quic_stream_frame_data_producer.h" #include "quiche/quic/core/quic_trace_visitor.h" -#include "quiche/quic/test_tools/mock_connection_id_generator.h" #include "quiche/quic/test_tools/simple_session_notifier.h" #include "quiche/quic/test_tools/simulator/link.h" #include "quiche/quic/test_tools/simulator/queue.h" @@ -131,7 +131,7 @@ std::unique_ptr<QuicTraceVisitor> trace_visitor_; - test::MockConnectionIdGenerator connection_id_generator_; + DeterministicConnectionIdGenerator connection_id_generator_; }; // Multiplexes multiple connections at the same host on the network.