Record objects enqueued in MOQT traces.
PiperOrigin-RevId: 823713706
diff --git a/quiche/quic/core/quic_trace_visitor.cc b/quiche/quic/core/quic_trace_visitor.cc
index 3b5131a..e04def2 100644
--- a/quiche/quic/core/quic_trace_visitor.cc
+++ b/quiche/quic/core/quic_trace_visitor.cc
@@ -4,6 +4,7 @@
#include "quiche/quic/core/quic_trace_visitor.h"
+#include <cstdint>
#include <string>
#include "quiche/quic/core/quic_types.h"
@@ -287,8 +288,7 @@
void QuicTraceVisitor::OnApplicationLimited() {
quic_trace::Event* event = trace_.add_events();
- event->set_time_us(
- ConvertTimestampToRecordedFormat(connection_->clock()->ApproximateNow()));
+ event->set_time_us(NowInRecordedFormat());
event->set_event_type(quic_trace::APPLICATION_LIMITED);
}
@@ -297,8 +297,7 @@
QuicByteCount /*old_cwnd*/,
QuicByteCount /*new_cwnd*/) {
quic_trace::Event* event = trace_.add_events();
- event->set_time_us(
- ConvertTimestampToRecordedFormat(connection_->clock()->ApproximateNow()));
+ event->set_time_us(NowInRecordedFormat());
event->set_event_type(quic_trace::EXTERNAL_PARAMETERS);
quic_trace::ExternalNetworkParameters* parameters =
@@ -347,4 +346,9 @@
}
}
+uint64_t QuicTraceVisitor::NowInRecordedFormat() {
+ return ConvertTimestampToRecordedFormat(
+ connection_->clock()->ApproximateNow());
+}
+
} // namespace quic
diff --git a/quiche/quic/core/quic_trace_visitor.h b/quiche/quic/core/quic_trace_visitor.h
index 3307dd0..a129797 100644
--- a/quiche/quic/core/quic_trace_visitor.h
+++ b/quiche/quic/core/quic_trace_visitor.h
@@ -5,9 +5,19 @@
#ifndef QUICHE_QUIC_CORE_QUIC_TRACE_VISITOR_H_
#define QUICHE_QUIC_CORE_QUIC_TRACE_VISITOR_H_
+#include <cstdint>
+
+#include "quiche/quic/core/frames/quic_ack_frame.h"
+#include "quiche/quic/core/frames/quic_frame.h"
+#include "quiche/quic/core/frames/quic_window_update_frame.h"
+#include "quiche/quic/core/quic_bandwidth.h"
#include "quiche/quic/core/quic_connection.h"
+#include "quiche/quic/core/quic_packet_number.h"
+#include "quiche/quic/core/quic_time.h"
#include "quiche/quic/core/quic_types.h"
+#include "quiche/quic/core/quic_versions.h"
#include "quic_trace/quic_trace.pb.h"
+#include "quiche/common/platform/api/quiche_export.h"
namespace quic {
@@ -54,17 +64,23 @@
// finished.
quic_trace::Trace* trace() { return &trace_; }
- private:
// Converts QuicTime into a microsecond delta w.r.t. the beginning of the
// connection.
uint64_t ConvertTimestampToRecordedFormat(QuicTime timestamp);
- // Populates a quic_trace::Frame message from |frame|.
- void PopulateFrameInfo(const QuicFrame& frame,
- quic_trace::Frame* frame_record);
+
+ // Returns the current trace timestamp (i.e. number of microseconds since the
+ // start of the connection).
+ uint64_t NowInRecordedFormat();
+
// Populates a quic_trace::TransportState message from the associated
// connection.
void PopulateTransportState(quic_trace::TransportState* state);
+ private:
+ // Populates a quic_trace::Frame message from |frame|.
+ void PopulateFrameInfo(const QuicFrame& frame,
+ quic_trace::Frame* frame_record);
+
quic_trace::Trace trace_;
const QuicConnection* connection_;
const QuicTime start_time_;
diff --git a/quiche/quic/moqt/moqt_integration_test.cc b/quiche/quic/moqt/moqt_integration_test.cc
index f870f94..66f0fd1 100644
--- a/quiche/quic/moqt/moqt_integration_test.cc
+++ b/quiche/quic/moqt/moqt_integration_test.cc
@@ -8,6 +8,7 @@
#include <string>
#include <utility>
#include <variant>
+#include <vector>
#include "absl/strings/str_cat.h"
#include "absl/strings/string_view.h"
@@ -34,12 +35,14 @@
#include "quiche/quic/test_tools/simulator/test_harness.h"
#include "quic_trace/quic_trace.pb.h"
#include "quiche/common/platform/api/quiche_test.h"
+#include "quiche/common/quiche_mem_slice.h"
namespace moqt::test {
namespace {
using ::quic::test::MemSliceFromString;
+using ::quiche::QuicheMemSlice;
using ::testing::_;
using ::testing::Assign;
using ::testing::Return;
@@ -64,8 +67,8 @@
test_harness_.simulator().GetClock();
client_->RecordTrace();
- client_->session()->trace_recorder().set_trace(
- client_->trace_visitor()->trace());
+ client_->session()->trace_recorder().SetParentRecorder(
+ client_->trace_visitor());
}
void WireUpEndpoints() { test_harness_.WireUpEndpoints(); }
@@ -891,6 +894,7 @@
}
TEST_F(MoqtIntegrationTest, RecordTrace) {
+ constexpr absl::string_view kObjectPayload = "object";
EstablishSession();
MoqtKnownTrackPublisher publisher;
client_->session()->set_publisher(&publisher);
@@ -912,23 +916,24 @@
test_harness_.RunUntilWithDefaultTimeout([&]() { return subscribed; });
EXPECT_TRUE(success);
- queue->AddObject(MemSliceFromString("object"), /*key=*/true);
+ queue->AddObject(QuicheMemSlice::Copy(kObjectPayload), /*key=*/true);
int received = 0;
EXPECT_CALL(subscribe_visitor_,
OnObjectFragment(_,
MetadataLocationAndStatus(
Location{0, 0}, MoqtObjectStatus::kNormal),
- "object", true))
+ kObjectPayload, true))
.WillOnce([&] { ++received; });
success =
test_harness_.RunUntilWithDefaultTimeout([&]() { return received >= 1; });
EXPECT_TRUE(success);
+ const quic_trace::Trace& trace = *client_->trace_visitor()->trace();
int control_streams = 0;
int subgroup_streams = 0;
for (const quic_trace::StreamAnnotation& annotation :
- client_->trace_visitor()->trace()->stream_annotations()) {
+ trace.stream_annotations()) {
if (annotation.moqt_control_stream()) {
++control_streams;
}
@@ -940,6 +945,21 @@
}
EXPECT_EQ(control_streams, 1);
EXPECT_EQ(subgroup_streams, 1);
+
+ int objects_enqueued = 0;
+ for (const quic_trace::Event& event : trace.events()) {
+ if (event.event_type() == quic_trace::EventType::MOQT_OBJECT_ENQUEUED) {
+ ++objects_enqueued;
+ ASSERT_TRUE(event.has_moqt_object());
+ ASSERT_TRUE(event.moqt_object().has_group_id());
+ ASSERT_TRUE(event.moqt_object().has_object_id());
+ EXPECT_EQ(event.moqt_object().group_id(), 0);
+ EXPECT_EQ(event.moqt_object().object_id(), 0);
+ EXPECT_EQ(event.moqt_object().payload_size(), kObjectPayload.size());
+ EXPECT_TRUE(event.has_transport_state());
+ }
+ }
+ EXPECT_EQ(objects_enqueued, 1);
}
} // namespace
diff --git a/quiche/quic/moqt/moqt_session.cc b/quiche/quic/moqt/moqt_session.cc
index 2dae601..9b48d78 100644
--- a/quiche/quic/moqt/moqt_session.cc
+++ b/quiche/quic/moqt/moqt_session.cc
@@ -1981,11 +1981,13 @@
QUIC_DLOG(INFO) << ENDPOINT << "Created subscription for "
<< subscribe.full_track_name;
session_->subscribed_track_names_.insert(subscribe.full_track_name);
+ session_->trace_recorder_.StartRecordingTrack(track_alias, track_publisher);
}
MoqtSession::PublishedSubscription::~PublishedSubscription() {
session_->subscribed_track_names_.erase(track_publisher_->GetTrackName());
track_publisher_->RemoveObjectListener(this);
+ session_->trace_recorder_.StopRecordingTrack(track_alias_);
}
SendStreamMap& MoqtSession::PublishedSubscription::stream_map() {
diff --git a/quiche/quic/moqt/moqt_session.h b/quiche/quic/moqt/moqt_session.h
index 28c8297..db35e5e 100644
--- a/quiche/quic/moqt/moqt_session.h
+++ b/quiche/quic/moqt/moqt_session.h
@@ -794,6 +794,8 @@
bool sent_goaway_ = false;
bool received_goaway_ = false;
+ MoqtTraceRecorder trace_recorder_;
+
// Upstream SUBSCRIBE state.
// Upstream SUBSCRIBEs and FETCHes, indexed by subscribe_id.
absl::flat_hash_map<uint64_t, std::unique_ptr<RemoteTrack>> upstream_by_id_;
@@ -874,8 +876,6 @@
// the first object of group n+1 arrives.
bool alternate_delivery_timeout_ = false;
- MoqtTraceRecorder trace_recorder_;
-
quiche::QuicheWeakPtrFactory<MoqtSessionInterface> weak_ptr_factory_;
// Must be last. Token used to make sure that the streams do not call into
diff --git a/quiche/quic/moqt/moqt_trace_recorder.cc b/quiche/quic/moqt/moqt_trace_recorder.cc
index 549d6ac..7ff7ff8 100644
--- a/quiche/quic/moqt/moqt_trace_recorder.cc
+++ b/quiche/quic/moqt/moqt_trace_recorder.cc
@@ -4,20 +4,32 @@
#include "quiche/quic/moqt/moqt_trace_recorder.h"
+#include <cstddef>
#include <cstdint>
+#include <memory>
+#include <optional>
+#include <utility>
+#include "absl/hash/hash.h"
#include "quiche/quic/moqt/moqt_messages.h"
+#include "quiche/quic/moqt/moqt_object.h"
+#include "quiche/quic/moqt/moqt_priority.h"
+#include "quiche/quic/moqt/moqt_publisher.h"
#include "quic_trace/quic_trace.pb.h"
+#include "quiche/common/platform/api/quiche_logging.h"
#include "quiche/web_transport/web_transport.h"
namespace moqt {
+using ::quic_trace::EventType;
+
void MoqtTraceRecorder::RecordControlStreamCreated(
webtransport::StreamId stream_id) {
- if (trace_ == nullptr) {
+ if (parent_ == nullptr) {
return;
}
- quic_trace::StreamAnnotation* annotation = trace_->add_stream_annotations();
+ quic_trace::StreamAnnotation* annotation =
+ parent_->trace()->add_stream_annotations();
annotation->set_stream_id(stream_id);
annotation->set_moqt_control_stream(true);
}
@@ -25,10 +37,11 @@
void MoqtTraceRecorder::RecordSubgroupStreamCreated(
webtransport::StreamId stream_id, uint64_t track_alias,
DataStreamIndex index) {
- if (trace_ == nullptr) {
+ if (parent_ == nullptr) {
return;
}
- quic_trace::StreamAnnotation* annotation = trace_->add_stream_annotations();
+ quic_trace::StreamAnnotation* annotation =
+ parent_->trace()->add_stream_annotations();
annotation->set_stream_id(stream_id);
annotation->mutable_moqt_subgroup_stream()->set_track_alias(track_alias);
annotation->mutable_moqt_subgroup_stream()->set_group_id(index.group);
@@ -37,22 +50,91 @@
void MoqtTraceRecorder::RecordFetchStreamCreated(
webtransport::StreamId stream_id) {
- if (trace_ == nullptr) {
+ if (parent_ == nullptr) {
return;
}
- quic_trace::StreamAnnotation* annotation = trace_->add_stream_annotations();
+ quic_trace::StreamAnnotation* annotation =
+ parent_->trace()->add_stream_annotations();
annotation->set_stream_id(stream_id);
annotation->mutable_moqt_fetch_stream();
}
void MoqtTraceRecorder::RecordProbeStreamCreated(
webtransport::StreamId stream_id, uint64_t probe_id) {
- if (trace_ == nullptr) {
+ if (parent_ == nullptr) {
return;
}
- quic_trace::StreamAnnotation* annotation = trace_->add_stream_annotations();
+ quic_trace::StreamAnnotation* annotation =
+ parent_->trace()->add_stream_annotations();
annotation->set_stream_id(stream_id);
annotation->mutable_moqt_probe_stream()->set_probe_id(probe_id);
}
+quic_trace::Event* MoqtTraceRecorder::AddEvent() {
+ quic_trace::Event* event = parent_->trace()->add_events();
+ event->set_time_us(parent_->NowInRecordedFormat());
+ return event;
+}
+
+MoqtTraceRecorder::Track::Track(MoqtTraceRecorder* recorder,
+ std::shared_ptr<MoqtTrackPublisher> publisher,
+ uint64_t track_alias)
+ : recorder_(recorder),
+ publisher_(std::move(publisher)),
+ track_alias_(track_alias) {
+ publisher_->AddObjectListener(this);
+}
+
+MoqtTraceRecorder::Track::~Track() { publisher_->RemoveObjectListener(this); }
+
+void MoqtTraceRecorder::Track::OnNewObjectAvailable(
+ Location sequence, uint64_t subgroup, MoqtPriority publisher_priority) {
+ if (recorder_->parent_ == nullptr) {
+ return;
+ }
+ quic_trace::Event* event = recorder_->AddEvent();
+ event->set_event_type(EventType::MOQT_OBJECT_ENQUEUED);
+ recorder_->parent_->PopulateTransportState(event->mutable_transport_state());
+
+ quic_trace::MoqtObject* object = event->mutable_moqt_object();
+ object->set_track_alias(track_alias_);
+ object->set_group_id(sequence.group);
+ object->set_object_id(sequence.object);
+ object->set_subgroup_id(subgroup);
+ object->set_publisher_priority(publisher_priority);
+
+ std::optional<PublishedObject> object_copy =
+ publisher_->GetCachedObject(sequence.group, subgroup, sequence.object);
+ if (object_copy.has_value() && object_copy->metadata.location == sequence) {
+ object->set_payload_size(object_copy->payload.length());
+ } else {
+ QUICHE_DLOG(WARNING) << "Track " << track_alias_ << " has marked "
+ << sequence
+ << " as enqueued, but GetCachedObject was not able to "
+ "return the said object";
+ }
+}
+
+size_t MoqtTraceRecorder::TrackAliasHash::operator()(
+ uint64_t track_alias) const {
+ return absl::HashOf(track_alias);
+}
+
+void MoqtTraceRecorder::StartRecordingTrack(
+ uint64_t track_alias, std::shared_ptr<MoqtTrackPublisher> publisher) {
+ if (parent_ == nullptr) {
+ return;
+ }
+ auto [it, added] = tracks_.emplace(this, std::move(publisher), track_alias);
+ QUICHE_DCHECK(added);
+}
+
+void MoqtTraceRecorder::StopRecordingTrack(uint64_t track_alias) {
+ if (parent_ == nullptr) {
+ return;
+ }
+ size_t erased = tracks_.erase(track_alias);
+ QUICHE_DCHECK_EQ(erased, 1);
+}
+
} // namespace moqt
diff --git a/quiche/quic/moqt/moqt_trace_recorder.h b/quiche/quic/moqt/moqt_trace_recorder.h
index 80c84bb..57c7dbd 100644
--- a/quiche/quic/moqt/moqt_trace_recorder.h
+++ b/quiche/quic/moqt/moqt_trace_recorder.h
@@ -5,10 +5,16 @@
#ifndef QUICHE_QUIC_MOQT_MOQT_TRACE_RECORDER_H_
#define QUICHE_QUIC_MOQT_MOQT_TRACE_RECORDER_H_
+#include <cstddef>
#include <cstdint>
+#include <memory>
#include "absl/base/nullability.h"
+#include "absl/container/node_hash_set.h"
+#include "quiche/quic/core/quic_trace_visitor.h"
#include "quiche/quic/moqt/moqt_messages.h"
+#include "quiche/quic/moqt/moqt_priority.h"
+#include "quiche/quic/moqt/moqt_publisher.h"
#include "quic_trace/quic_trace.pb.h"
#include "quiche/web_transport/web_transport.h"
@@ -18,16 +24,18 @@
// wrapped trace can be nullptr, in which case no recording takes place.
class MoqtTraceRecorder {
public:
- MoqtTraceRecorder() : trace_(nullptr) {}
- explicit MoqtTraceRecorder(quic_trace::Trace* absl_nullable trace)
- : trace_(trace) {}
+ MoqtTraceRecorder() : parent_(nullptr) {}
+ explicit MoqtTraceRecorder(quic::QuicTraceVisitor* absl_nullable parent)
+ : parent_(parent) {}
MoqtTraceRecorder(const MoqtTraceRecorder&) = delete;
MoqtTraceRecorder(MoqtTraceRecorder&&) = delete;
MoqtTraceRecorder& operator=(const MoqtTraceRecorder&) = delete;
MoqtTraceRecorder& operator=(MoqtTraceRecorder&&) = delete;
- void set_trace(quic_trace::Trace* absl_nullable trace) { trace_ = trace; }
+ void SetParentRecorder(quic::QuicTraceVisitor* absl_nullable parent) {
+ parent_ = parent;
+ }
// Annotates the specified stream as the MOQT control stream.
void RecordControlStreamCreated(webtransport::StreamId stream_id);
@@ -43,8 +51,70 @@
void RecordProbeStreamCreated(webtransport::StreamId stream_id,
uint64_t probe_id);
+ // Records the track-related events by registering the recorder as a listener
+ // of `publisher`.
+ void StartRecordingTrack(uint64_t track_alias,
+ std::shared_ptr<MoqtTrackPublisher> publisher);
+ // Removes a previously added track.
+ void StopRecordingTrack(uint64_t track_alias);
+
private:
- quic_trace::Trace* absl_nullable trace_;
+ // Visitor that records events for a specific published track.
+ class Track : public MoqtObjectListener {
+ public:
+ Track(MoqtTraceRecorder* recorder,
+ std::shared_ptr<MoqtTrackPublisher> publisher, uint64_t track_alias);
+ ~Track();
+
+ Track(const Track&) = delete;
+ Track(Track&&) = delete;
+ Track& operator=(const Track&) = delete;
+ Track& operator=(Track&&) = delete;
+
+ // MoqtObjectListener implementation.
+ void OnSubscribeAccepted() override {}
+ void OnSubscribeRejected(MoqtSubscribeErrorReason reason) override {}
+ void OnNewObjectAvailable(Location sequence, uint64_t subgroup,
+ MoqtPriority publisher_priority) override;
+ void OnNewFinAvailable(Location final_object_in_subgroup,
+ uint64_t subgroup_id) override {}
+ void OnSubgroupAbandoned(
+ uint64_t group, uint64_t subgroup,
+ webtransport::StreamErrorCode error_code) override {}
+ void OnGroupAbandoned(uint64_t group_id) override {}
+ void OnTrackPublisherGone() override {}
+
+ uint64_t track_alias() const { return track_alias_; }
+
+ private:
+ MoqtTraceRecorder* const recorder_;
+ std::shared_ptr<MoqtTrackPublisher> const publisher_;
+ const uint64_t track_alias_;
+ };
+
+ // Index Track objects by track_alias.
+ struct TrackAliasEq {
+ using is_transparent = void;
+ bool operator()(const Track& a, const Track& b) const {
+ return a.track_alias() == b.track_alias();
+ }
+ bool operator()(const Track& a, uint64_t b) const {
+ return a.track_alias() == b;
+ }
+ };
+ struct TrackAliasHash {
+ using is_transparent = void;
+ size_t operator()(uint64_t track_alias) const;
+ size_t operator()(const Track& track) const {
+ return (*this)(track.track_alias());
+ }
+ };
+
+ // Adds a new event to the trace, and populates the timestamp.
+ quic_trace::Event* AddEvent();
+
+ quic::QuicTraceVisitor* absl_nullable parent_;
+ absl::node_hash_set<Track, TrackAliasHash, TrackAliasEq> tracks_;
};
} // namespace moqt
diff --git a/quiche/quic/moqt/test_tools/moqt_simulator.cc b/quiche/quic/moqt/test_tools/moqt_simulator.cc
index c3ba87b..4c598d6 100644
--- a/quiche/quic/moqt/test_tools/moqt_simulator.cc
+++ b/quiche/quic/moqt/test_tools/moqt_simulator.cc
@@ -280,8 +280,8 @@
}
client_endpoint_.RecordTrace();
QUICHE_DCHECK(client_endpoint_.trace_visitor() != nullptr);
- client_endpoint_.session()->trace_recorder().set_trace(
- client_endpoint_.trace_visitor()->trace());
+ client_endpoint_.session()->trace_recorder().SetParentRecorder(
+ client_endpoint_.trace_visitor());
}
std::string MoqtSimulator::GetClientSessionCongestionControl() {