Record objects enqueued in MOQT traces.

PiperOrigin-RevId: 823713706
diff --git a/quiche/quic/core/quic_trace_visitor.cc b/quiche/quic/core/quic_trace_visitor.cc
index 3b5131a..e04def2 100644
--- a/quiche/quic/core/quic_trace_visitor.cc
+++ b/quiche/quic/core/quic_trace_visitor.cc
@@ -4,6 +4,7 @@
 
 #include "quiche/quic/core/quic_trace_visitor.h"
 
+#include <cstdint>
 #include <string>
 
 #include "quiche/quic/core/quic_types.h"
@@ -287,8 +288,7 @@
 
 void QuicTraceVisitor::OnApplicationLimited() {
   quic_trace::Event* event = trace_.add_events();
-  event->set_time_us(
-      ConvertTimestampToRecordedFormat(connection_->clock()->ApproximateNow()));
+  event->set_time_us(NowInRecordedFormat());
   event->set_event_type(quic_trace::APPLICATION_LIMITED);
 }
 
@@ -297,8 +297,7 @@
                                                  QuicByteCount /*old_cwnd*/,
                                                  QuicByteCount /*new_cwnd*/) {
   quic_trace::Event* event = trace_.add_events();
-  event->set_time_us(
-      ConvertTimestampToRecordedFormat(connection_->clock()->ApproximateNow()));
+  event->set_time_us(NowInRecordedFormat());
   event->set_event_type(quic_trace::EXTERNAL_PARAMETERS);
 
   quic_trace::ExternalNetworkParameters* parameters =
@@ -347,4 +346,9 @@
   }
 }
 
+uint64_t QuicTraceVisitor::NowInRecordedFormat() {
+  return ConvertTimestampToRecordedFormat(
+      connection_->clock()->ApproximateNow());
+}
+
 }  // namespace quic
diff --git a/quiche/quic/core/quic_trace_visitor.h b/quiche/quic/core/quic_trace_visitor.h
index 3307dd0..a129797 100644
--- a/quiche/quic/core/quic_trace_visitor.h
+++ b/quiche/quic/core/quic_trace_visitor.h
@@ -5,9 +5,19 @@
 #ifndef QUICHE_QUIC_CORE_QUIC_TRACE_VISITOR_H_
 #define QUICHE_QUIC_CORE_QUIC_TRACE_VISITOR_H_
 
+#include <cstdint>
+
+#include "quiche/quic/core/frames/quic_ack_frame.h"
+#include "quiche/quic/core/frames/quic_frame.h"
+#include "quiche/quic/core/frames/quic_window_update_frame.h"
+#include "quiche/quic/core/quic_bandwidth.h"
 #include "quiche/quic/core/quic_connection.h"
+#include "quiche/quic/core/quic_packet_number.h"
+#include "quiche/quic/core/quic_time.h"
 #include "quiche/quic/core/quic_types.h"
+#include "quiche/quic/core/quic_versions.h"
 #include "quic_trace/quic_trace.pb.h"
+#include "quiche/common/platform/api/quiche_export.h"
 
 namespace quic {
 
@@ -54,17 +64,23 @@
   // finished.
   quic_trace::Trace* trace() { return &trace_; }
 
- private:
   // Converts QuicTime into a microsecond delta w.r.t. the beginning of the
   // connection.
   uint64_t ConvertTimestampToRecordedFormat(QuicTime timestamp);
-  // Populates a quic_trace::Frame message from |frame|.
-  void PopulateFrameInfo(const QuicFrame& frame,
-                         quic_trace::Frame* frame_record);
+
+  // Returns the current trace timestamp (i.e. number of microseconds since the
+  // start of the connection).
+  uint64_t NowInRecordedFormat();
+
   // Populates a quic_trace::TransportState message from the associated
   // connection.
   void PopulateTransportState(quic_trace::TransportState* state);
 
+ private:
+  // Populates a quic_trace::Frame message from |frame|.
+  void PopulateFrameInfo(const QuicFrame& frame,
+                         quic_trace::Frame* frame_record);
+
   quic_trace::Trace trace_;
   const QuicConnection* connection_;
   const QuicTime start_time_;
diff --git a/quiche/quic/moqt/moqt_integration_test.cc b/quiche/quic/moqt/moqt_integration_test.cc
index f870f94..66f0fd1 100644
--- a/quiche/quic/moqt/moqt_integration_test.cc
+++ b/quiche/quic/moqt/moqt_integration_test.cc
@@ -8,6 +8,7 @@
 #include <string>
 #include <utility>
 #include <variant>
+#include <vector>
 
 #include "absl/strings/str_cat.h"
 #include "absl/strings/string_view.h"
@@ -34,12 +35,14 @@
 #include "quiche/quic/test_tools/simulator/test_harness.h"
 #include "quic_trace/quic_trace.pb.h"
 #include "quiche/common/platform/api/quiche_test.h"
+#include "quiche/common/quiche_mem_slice.h"
 
 namespace moqt::test {
 
 namespace {
 
 using ::quic::test::MemSliceFromString;
+using ::quiche::QuicheMemSlice;
 using ::testing::_;
 using ::testing::Assign;
 using ::testing::Return;
@@ -64,8 +67,8 @@
         test_harness_.simulator().GetClock();
 
     client_->RecordTrace();
-    client_->session()->trace_recorder().set_trace(
-        client_->trace_visitor()->trace());
+    client_->session()->trace_recorder().SetParentRecorder(
+        client_->trace_visitor());
   }
 
   void WireUpEndpoints() { test_harness_.WireUpEndpoints(); }
@@ -891,6 +894,7 @@
 }
 
 TEST_F(MoqtIntegrationTest, RecordTrace) {
+  constexpr absl::string_view kObjectPayload = "object";
   EstablishSession();
   MoqtKnownTrackPublisher publisher;
   client_->session()->set_publisher(&publisher);
@@ -912,23 +916,24 @@
       test_harness_.RunUntilWithDefaultTimeout([&]() { return subscribed; });
   EXPECT_TRUE(success);
 
-  queue->AddObject(MemSliceFromString("object"), /*key=*/true);
+  queue->AddObject(QuicheMemSlice::Copy(kObjectPayload), /*key=*/true);
   int received = 0;
   EXPECT_CALL(subscribe_visitor_,
               OnObjectFragment(_,
                                MetadataLocationAndStatus(
                                    Location{0, 0}, MoqtObjectStatus::kNormal),
-                               "object", true))
+                               kObjectPayload, true))
       .WillOnce([&] { ++received; });
 
   success =
       test_harness_.RunUntilWithDefaultTimeout([&]() { return received >= 1; });
   EXPECT_TRUE(success);
+  const quic_trace::Trace& trace = *client_->trace_visitor()->trace();
 
   int control_streams = 0;
   int subgroup_streams = 0;
   for (const quic_trace::StreamAnnotation& annotation :
-       client_->trace_visitor()->trace()->stream_annotations()) {
+       trace.stream_annotations()) {
     if (annotation.moqt_control_stream()) {
       ++control_streams;
     }
@@ -940,6 +945,21 @@
   }
   EXPECT_EQ(control_streams, 1);
   EXPECT_EQ(subgroup_streams, 1);
+
+  int objects_enqueued = 0;
+  for (const quic_trace::Event& event : trace.events()) {
+    if (event.event_type() == quic_trace::EventType::MOQT_OBJECT_ENQUEUED) {
+      ++objects_enqueued;
+      ASSERT_TRUE(event.has_moqt_object());
+      ASSERT_TRUE(event.moqt_object().has_group_id());
+      ASSERT_TRUE(event.moqt_object().has_object_id());
+      EXPECT_EQ(event.moqt_object().group_id(), 0);
+      EXPECT_EQ(event.moqt_object().object_id(), 0);
+      EXPECT_EQ(event.moqt_object().payload_size(), kObjectPayload.size());
+      EXPECT_TRUE(event.has_transport_state());
+    }
+  }
+  EXPECT_EQ(objects_enqueued, 1);
 }
 
 }  // namespace
diff --git a/quiche/quic/moqt/moqt_session.cc b/quiche/quic/moqt/moqt_session.cc
index 2dae601..9b48d78 100644
--- a/quiche/quic/moqt/moqt_session.cc
+++ b/quiche/quic/moqt/moqt_session.cc
@@ -1981,11 +1981,13 @@
   QUIC_DLOG(INFO) << ENDPOINT << "Created subscription for "
                   << subscribe.full_track_name;
   session_->subscribed_track_names_.insert(subscribe.full_track_name);
+  session_->trace_recorder_.StartRecordingTrack(track_alias, track_publisher);
 }
 
 MoqtSession::PublishedSubscription::~PublishedSubscription() {
   session_->subscribed_track_names_.erase(track_publisher_->GetTrackName());
   track_publisher_->RemoveObjectListener(this);
+  session_->trace_recorder_.StopRecordingTrack(track_alias_);
 }
 
 SendStreamMap& MoqtSession::PublishedSubscription::stream_map() {
diff --git a/quiche/quic/moqt/moqt_session.h b/quiche/quic/moqt/moqt_session.h
index 28c8297..db35e5e 100644
--- a/quiche/quic/moqt/moqt_session.h
+++ b/quiche/quic/moqt/moqt_session.h
@@ -794,6 +794,8 @@
   bool sent_goaway_ = false;
   bool received_goaway_ = false;
 
+  MoqtTraceRecorder trace_recorder_;
+
   // Upstream SUBSCRIBE state.
   // Upstream SUBSCRIBEs and FETCHes, indexed by subscribe_id.
   absl::flat_hash_map<uint64_t, std::unique_ptr<RemoteTrack>> upstream_by_id_;
@@ -874,8 +876,6 @@
   // the first object of group n+1 arrives.
   bool alternate_delivery_timeout_ = false;
 
-  MoqtTraceRecorder trace_recorder_;
-
   quiche::QuicheWeakPtrFactory<MoqtSessionInterface> weak_ptr_factory_;
 
   // Must be last.  Token used to make sure that the streams do not call into
diff --git a/quiche/quic/moqt/moqt_trace_recorder.cc b/quiche/quic/moqt/moqt_trace_recorder.cc
index 549d6ac..7ff7ff8 100644
--- a/quiche/quic/moqt/moqt_trace_recorder.cc
+++ b/quiche/quic/moqt/moqt_trace_recorder.cc
@@ -4,20 +4,32 @@
 
 #include "quiche/quic/moqt/moqt_trace_recorder.h"
 
+#include <cstddef>
 #include <cstdint>
+#include <memory>
+#include <optional>
+#include <utility>
 
+#include "absl/hash/hash.h"
 #include "quiche/quic/moqt/moqt_messages.h"
+#include "quiche/quic/moqt/moqt_object.h"
+#include "quiche/quic/moqt/moqt_priority.h"
+#include "quiche/quic/moqt/moqt_publisher.h"
 #include "quic_trace/quic_trace.pb.h"
+#include "quiche/common/platform/api/quiche_logging.h"
 #include "quiche/web_transport/web_transport.h"
 
 namespace moqt {
 
+using ::quic_trace::EventType;
+
 void MoqtTraceRecorder::RecordControlStreamCreated(
     webtransport::StreamId stream_id) {
-  if (trace_ == nullptr) {
+  if (parent_ == nullptr) {
     return;
   }
-  quic_trace::StreamAnnotation* annotation = trace_->add_stream_annotations();
+  quic_trace::StreamAnnotation* annotation =
+      parent_->trace()->add_stream_annotations();
   annotation->set_stream_id(stream_id);
   annotation->set_moqt_control_stream(true);
 }
@@ -25,10 +37,11 @@
 void MoqtTraceRecorder::RecordSubgroupStreamCreated(
     webtransport::StreamId stream_id, uint64_t track_alias,
     DataStreamIndex index) {
-  if (trace_ == nullptr) {
+  if (parent_ == nullptr) {
     return;
   }
-  quic_trace::StreamAnnotation* annotation = trace_->add_stream_annotations();
+  quic_trace::StreamAnnotation* annotation =
+      parent_->trace()->add_stream_annotations();
   annotation->set_stream_id(stream_id);
   annotation->mutable_moqt_subgroup_stream()->set_track_alias(track_alias);
   annotation->mutable_moqt_subgroup_stream()->set_group_id(index.group);
@@ -37,22 +50,91 @@
 
 void MoqtTraceRecorder::RecordFetchStreamCreated(
     webtransport::StreamId stream_id) {
-  if (trace_ == nullptr) {
+  if (parent_ == nullptr) {
     return;
   }
-  quic_trace::StreamAnnotation* annotation = trace_->add_stream_annotations();
+  quic_trace::StreamAnnotation* annotation =
+      parent_->trace()->add_stream_annotations();
   annotation->set_stream_id(stream_id);
   annotation->mutable_moqt_fetch_stream();
 }
 
 void MoqtTraceRecorder::RecordProbeStreamCreated(
     webtransport::StreamId stream_id, uint64_t probe_id) {
-  if (trace_ == nullptr) {
+  if (parent_ == nullptr) {
     return;
   }
-  quic_trace::StreamAnnotation* annotation = trace_->add_stream_annotations();
+  quic_trace::StreamAnnotation* annotation =
+      parent_->trace()->add_stream_annotations();
   annotation->set_stream_id(stream_id);
   annotation->mutable_moqt_probe_stream()->set_probe_id(probe_id);
 }
 
+quic_trace::Event* MoqtTraceRecorder::AddEvent() {
+  quic_trace::Event* event = parent_->trace()->add_events();
+  event->set_time_us(parent_->NowInRecordedFormat());
+  return event;
+}
+
+MoqtTraceRecorder::Track::Track(MoqtTraceRecorder* recorder,
+                                std::shared_ptr<MoqtTrackPublisher> publisher,
+                                uint64_t track_alias)
+    : recorder_(recorder),
+      publisher_(std::move(publisher)),
+      track_alias_(track_alias) {
+  publisher_->AddObjectListener(this);
+}
+
+MoqtTraceRecorder::Track::~Track() { publisher_->RemoveObjectListener(this); }
+
+void MoqtTraceRecorder::Track::OnNewObjectAvailable(
+    Location sequence, uint64_t subgroup, MoqtPriority publisher_priority) {
+  if (recorder_->parent_ == nullptr) {
+    return;
+  }
+  quic_trace::Event* event = recorder_->AddEvent();
+  event->set_event_type(EventType::MOQT_OBJECT_ENQUEUED);
+  recorder_->parent_->PopulateTransportState(event->mutable_transport_state());
+
+  quic_trace::MoqtObject* object = event->mutable_moqt_object();
+  object->set_track_alias(track_alias_);
+  object->set_group_id(sequence.group);
+  object->set_object_id(sequence.object);
+  object->set_subgroup_id(subgroup);
+  object->set_publisher_priority(publisher_priority);
+
+  std::optional<PublishedObject> object_copy =
+      publisher_->GetCachedObject(sequence.group, subgroup, sequence.object);
+  if (object_copy.has_value() && object_copy->metadata.location == sequence) {
+    object->set_payload_size(object_copy->payload.length());
+  } else {
+    QUICHE_DLOG(WARNING) << "Track " << track_alias_ << " has marked "
+                         << sequence
+                         << " as enqueued, but GetCachedObject was not able to "
+                            "return the said object";
+  }
+}
+
+size_t MoqtTraceRecorder::TrackAliasHash::operator()(
+    uint64_t track_alias) const {
+  return absl::HashOf(track_alias);
+}
+
+void MoqtTraceRecorder::StartRecordingTrack(
+    uint64_t track_alias, std::shared_ptr<MoqtTrackPublisher> publisher) {
+  if (parent_ == nullptr) {
+    return;
+  }
+  auto [it, added] = tracks_.emplace(this, std::move(publisher), track_alias);
+  QUICHE_DCHECK(added);
+}
+
+void MoqtTraceRecorder::StopRecordingTrack(uint64_t track_alias) {
+  if (parent_ == nullptr) {
+    return;
+  }
+  size_t erased = tracks_.erase(track_alias);
+  QUICHE_DCHECK_EQ(erased, 1);
+}
+
 }  // namespace moqt
diff --git a/quiche/quic/moqt/moqt_trace_recorder.h b/quiche/quic/moqt/moqt_trace_recorder.h
index 80c84bb..57c7dbd 100644
--- a/quiche/quic/moqt/moqt_trace_recorder.h
+++ b/quiche/quic/moqt/moqt_trace_recorder.h
@@ -5,10 +5,16 @@
 #ifndef QUICHE_QUIC_MOQT_MOQT_TRACE_RECORDER_H_
 #define QUICHE_QUIC_MOQT_MOQT_TRACE_RECORDER_H_
 
+#include <cstddef>
 #include <cstdint>
+#include <memory>
 
 #include "absl/base/nullability.h"
+#include "absl/container/node_hash_set.h"
+#include "quiche/quic/core/quic_trace_visitor.h"
 #include "quiche/quic/moqt/moqt_messages.h"
+#include "quiche/quic/moqt/moqt_priority.h"
+#include "quiche/quic/moqt/moqt_publisher.h"
 #include "quic_trace/quic_trace.pb.h"
 #include "quiche/web_transport/web_transport.h"
 
@@ -18,16 +24,18 @@
 // wrapped trace can be nullptr, in which case no recording takes place.
 class MoqtTraceRecorder {
  public:
-  MoqtTraceRecorder() : trace_(nullptr) {}
-  explicit MoqtTraceRecorder(quic_trace::Trace* absl_nullable trace)
-      : trace_(trace) {}
+  MoqtTraceRecorder() : parent_(nullptr) {}
+  explicit MoqtTraceRecorder(quic::QuicTraceVisitor* absl_nullable parent)
+      : parent_(parent) {}
 
   MoqtTraceRecorder(const MoqtTraceRecorder&) = delete;
   MoqtTraceRecorder(MoqtTraceRecorder&&) = delete;
   MoqtTraceRecorder& operator=(const MoqtTraceRecorder&) = delete;
   MoqtTraceRecorder& operator=(MoqtTraceRecorder&&) = delete;
 
-  void set_trace(quic_trace::Trace* absl_nullable trace) { trace_ = trace; }
+  void SetParentRecorder(quic::QuicTraceVisitor* absl_nullable parent) {
+    parent_ = parent;
+  }
 
   // Annotates the specified stream as the MOQT control stream.
   void RecordControlStreamCreated(webtransport::StreamId stream_id);
@@ -43,8 +51,70 @@
   void RecordProbeStreamCreated(webtransport::StreamId stream_id,
                                 uint64_t probe_id);
 
+  // Records the track-related events by registering the recorder as a listener
+  // of `publisher`.
+  void StartRecordingTrack(uint64_t track_alias,
+                           std::shared_ptr<MoqtTrackPublisher> publisher);
+  // Removes a previously added track.
+  void StopRecordingTrack(uint64_t track_alias);
+
  private:
-  quic_trace::Trace* absl_nullable trace_;
+  // Visitor that records events for a specific published track.
+  class Track : public MoqtObjectListener {
+   public:
+    Track(MoqtTraceRecorder* recorder,
+          std::shared_ptr<MoqtTrackPublisher> publisher, uint64_t track_alias);
+    ~Track();
+
+    Track(const Track&) = delete;
+    Track(Track&&) = delete;
+    Track& operator=(const Track&) = delete;
+    Track& operator=(Track&&) = delete;
+
+    // MoqtObjectListener implementation.
+    void OnSubscribeAccepted() override {}
+    void OnSubscribeRejected(MoqtSubscribeErrorReason reason) override {}
+    void OnNewObjectAvailable(Location sequence, uint64_t subgroup,
+                              MoqtPriority publisher_priority) override;
+    void OnNewFinAvailable(Location final_object_in_subgroup,
+                           uint64_t subgroup_id) override {}
+    void OnSubgroupAbandoned(
+        uint64_t group, uint64_t subgroup,
+        webtransport::StreamErrorCode error_code) override {}
+    void OnGroupAbandoned(uint64_t group_id) override {}
+    void OnTrackPublisherGone() override {}
+
+    uint64_t track_alias() const { return track_alias_; }
+
+   private:
+    MoqtTraceRecorder* const recorder_;
+    std::shared_ptr<MoqtTrackPublisher> const publisher_;
+    const uint64_t track_alias_;
+  };
+
+  // Index Track objects by track_alias.
+  struct TrackAliasEq {
+    using is_transparent = void;
+    bool operator()(const Track& a, const Track& b) const {
+      return a.track_alias() == b.track_alias();
+    }
+    bool operator()(const Track& a, uint64_t b) const {
+      return a.track_alias() == b;
+    }
+  };
+  struct TrackAliasHash {
+    using is_transparent = void;
+    size_t operator()(uint64_t track_alias) const;
+    size_t operator()(const Track& track) const {
+      return (*this)(track.track_alias());
+    }
+  };
+
+  // Adds a new event to the trace, and populates the timestamp.
+  quic_trace::Event* AddEvent();
+
+  quic::QuicTraceVisitor* absl_nullable parent_;
+  absl::node_hash_set<Track, TrackAliasHash, TrackAliasEq> tracks_;
 };
 
 }  // namespace moqt
diff --git a/quiche/quic/moqt/test_tools/moqt_simulator.cc b/quiche/quic/moqt/test_tools/moqt_simulator.cc
index c3ba87b..4c598d6 100644
--- a/quiche/quic/moqt/test_tools/moqt_simulator.cc
+++ b/quiche/quic/moqt/test_tools/moqt_simulator.cc
@@ -280,8 +280,8 @@
   }
   client_endpoint_.RecordTrace();
   QUICHE_DCHECK(client_endpoint_.trace_visitor() != nullptr);
-  client_endpoint_.session()->trace_recorder().set_trace(
-      client_endpoint_.trace_visitor()->trace());
+  client_endpoint_.session()->trace_recorder().SetParentRecorder(
+      client_endpoint_.trace_visitor());
 }
 
 std::string MoqtSimulator::GetClientSessionCongestionControl() {