Record MOQT-specific stream annotations into QUIC traces if available.

PiperOrigin-RevId: 817917687
diff --git a/build/source_list.bzl b/build/source_list.bzl
index daff02a..5180db2 100644
--- a/build/source_list.bzl
+++ b/build/source_list.bzl
@@ -1573,6 +1573,7 @@
     "quic/moqt/moqt_session_callbacks.h",
     "quic/moqt/moqt_session_interface.h",
     "quic/moqt/moqt_subscribe_windows.h",
+    "quic/moqt/moqt_trace_recorder.h",
     "quic/moqt/moqt_track.h",
     "quic/moqt/relay_namespace_tree.h",
     "quic/moqt/session_namespace_tree.h",
@@ -1598,6 +1599,7 @@
     "quic/moqt/moqt_relay_track_publisher.cc",
     "quic/moqt/moqt_session.cc",
     "quic/moqt/moqt_subscribe_windows.cc",
+    "quic/moqt/moqt_trace_recorder.cc",
     "quic/moqt/moqt_track.cc",
     "quic/moqt/tools/chat_client.cc",
     "quic/moqt/tools/chat_server.cc",
diff --git a/build/source_list.gni b/build/source_list.gni
index 93b2900..a8fbd1c 100644
--- a/build/source_list.gni
+++ b/build/source_list.gni
@@ -1577,6 +1577,7 @@
     "src/quiche/quic/moqt/moqt_session_callbacks.h",
     "src/quiche/quic/moqt/moqt_session_interface.h",
     "src/quiche/quic/moqt/moqt_subscribe_windows.h",
+    "src/quiche/quic/moqt/moqt_trace_recorder.h",
     "src/quiche/quic/moqt/moqt_track.h",
     "src/quiche/quic/moqt/relay_namespace_tree.h",
     "src/quiche/quic/moqt/session_namespace_tree.h",
@@ -1602,6 +1603,7 @@
     "src/quiche/quic/moqt/moqt_relay_track_publisher.cc",
     "src/quiche/quic/moqt/moqt_session.cc",
     "src/quiche/quic/moqt/moqt_subscribe_windows.cc",
+    "src/quiche/quic/moqt/moqt_trace_recorder.cc",
     "src/quiche/quic/moqt/moqt_track.cc",
     "src/quiche/quic/moqt/tools/chat_client.cc",
     "src/quiche/quic/moqt/tools/chat_server.cc",
diff --git a/build/source_list.json b/build/source_list.json
index c62f5e3..5614381 100644
--- a/build/source_list.json
+++ b/build/source_list.json
@@ -1576,6 +1576,7 @@
     "quiche/quic/moqt/moqt_session_callbacks.h",
     "quiche/quic/moqt/moqt_session_interface.h",
     "quiche/quic/moqt/moqt_subscribe_windows.h",
+    "quiche/quic/moqt/moqt_trace_recorder.h",
     "quiche/quic/moqt/moqt_track.h",
     "quiche/quic/moqt/relay_namespace_tree.h",
     "quiche/quic/moqt/session_namespace_tree.h",
@@ -1601,6 +1602,7 @@
     "quiche/quic/moqt/moqt_relay_track_publisher.cc",
     "quiche/quic/moqt/moqt_session.cc",
     "quiche/quic/moqt/moqt_subscribe_windows.cc",
+    "quiche/quic/moqt/moqt_trace_recorder.cc",
     "quiche/quic/moqt/moqt_track.cc",
     "quiche/quic/moqt/tools/chat_client.cc",
     "quiche/quic/moqt/tools/chat_server.cc",
diff --git a/quiche/quic/moqt/moqt_integration_test.cc b/quiche/quic/moqt/moqt_integration_test.cc
index e5d3f8d..ac28942 100644
--- a/quiche/quic/moqt/moqt_integration_test.cc
+++ b/quiche/quic/moqt/moqt_integration_test.cc
@@ -31,6 +31,7 @@
 #include "quiche/quic/moqt/test_tools/moqt_simulator_harness.h"
 #include "quiche/quic/test_tools/quic_test_utils.h"
 #include "quiche/quic/test_tools/simulator/test_harness.h"
+#include "quic_trace/quic_trace.pb.h"
 #include "quiche/common/platform/api/quiche_test.h"
 
 namespace moqt::test {
@@ -60,6 +61,10 @@
     server_->session()->callbacks() = server_callbacks_.AsSessionCallbacks();
     server_->session()->callbacks().clock =
         test_harness_.simulator().GetClock();
+
+    client_->RecordTrace();
+    client_->session()->trace_recorder().set_trace(
+        client_->trace_visitor()->trace());
   }
 
   void WireUpEndpoints() { test_harness_.WireUpEndpoints(); }
@@ -842,7 +847,8 @@
   EstablishSession();
   MoqtProbeManager probe_manager(client_->session()->session(),
                                  test_harness_.simulator().GetClock(),
-                                 *test_harness_.simulator().GetAlarmFactory());
+                                 *test_harness_.simulator().GetAlarmFactory(),
+                                 &client_->session()->trace_recorder());
 
   constexpr quic::QuicBandwidth kModelBandwidth =
       quic::simulator::TestHarness::kServerBandwidth;
@@ -850,14 +856,77 @@
   constexpr quic::QuicTimeDelta kProbeTimeout =
       kModelBandwidth.TransferTime(kProbeSize) * 10;
   bool probe_done = false;
-  probe_manager.StartProbe(kProbeSize, kProbeTimeout,
-                           [&probe_done](const ProbeResult& result) {
-                             probe_done = true;
-                             EXPECT_EQ(result.status, ProbeStatus::kSuccess);
-                           });
+  std::optional<ProbeId> probe_id = probe_manager.StartProbe(
+      kProbeSize, kProbeTimeout, [&probe_done](const ProbeResult& result) {
+        probe_done = true;
+        EXPECT_EQ(result.status, ProbeStatus::kSuccess);
+      });
   bool success =
       test_harness_.RunUntilWithDefaultTimeout([&]() { return probe_done; });
   EXPECT_TRUE(success);
+
+  int probe_streams = 0;
+  for (const quic_trace::StreamAnnotation& annotation :
+       client_->trace_visitor()->trace()->stream_annotations()) {
+    if (annotation.has_moqt_probe_stream()) {
+      ++probe_streams;
+      EXPECT_EQ(probe_id, annotation.moqt_probe_stream().probe_id());
+    }
+  }
+  EXPECT_EQ(probe_streams, 1);
+}
+
+TEST_F(MoqtIntegrationTest, RecordTrace) {
+  EstablishSession();
+  MoqtKnownTrackPublisher publisher;
+  client_->session()->set_publisher(&publisher);
+
+  MockSubscribeRemoteTrackVisitor client_visitor;
+  auto queue = std::make_shared<MoqtOutgoingQueue>(
+      FullTrackName{"test", "subgroup"}, MoqtForwardingPreference::kSubgroup);
+  publisher.Add(queue);
+
+  server_->session()->SubscribeCurrentObject(FullTrackName("test", "subgroup"),
+                                             &client_visitor,
+                                             VersionSpecificParameters());
+  bool subscribed = false;
+  EXPECT_CALL(client_visitor, OnReply)
+      .WillOnce([&](const FullTrackName&,
+                    std::variant<SubscribeOkData, MoqtRequestError>) {
+        subscribed = true;
+      });
+  bool success =
+      test_harness_.RunUntilWithDefaultTimeout([&]() { return subscribed; });
+  EXPECT_TRUE(success);
+
+  queue->AddObject(MemSliceFromString("object"), /*key=*/true);
+  int received = 0;
+  EXPECT_CALL(client_visitor,
+              OnObjectFragment(_,
+                               MetadataLocationAndStatus(
+                                   Location{0, 0}, MoqtObjectStatus::kNormal),
+                               "object", true))
+      .WillOnce([&] { ++received; });
+
+  success =
+      test_harness_.RunUntilWithDefaultTimeout([&]() { return received >= 1; });
+  EXPECT_TRUE(success);
+
+  int control_streams = 0;
+  int subgroup_streams = 0;
+  for (const quic_trace::StreamAnnotation& annotation :
+       client_->trace_visitor()->trace()->stream_annotations()) {
+    if (annotation.moqt_control_stream()) {
+      ++control_streams;
+    }
+    if (annotation.has_moqt_subgroup_stream()) {
+      ++subgroup_streams;
+      EXPECT_EQ(annotation.moqt_subgroup_stream().group_id(), 0);
+      EXPECT_EQ(annotation.moqt_subgroup_stream().subgroup_id(), 0);
+    }
+  }
+  EXPECT_EQ(control_streams, 1);
+  EXPECT_EQ(subgroup_streams, 1);
 }
 
 }  // namespace
diff --git a/quiche/quic/moqt/moqt_probe_manager.cc b/quiche/quic/moqt/moqt_probe_manager.cc
index 2ad86df..3ff5bcc 100644
--- a/quiche/quic/moqt/moqt_probe_manager.cc
+++ b/quiche/quic/moqt/moqt_probe_manager.cc
@@ -42,6 +42,7 @@
   if (stream == nullptr) {
     return std::nullopt;
   }
+  trace_recorder_->RecordProbeStreamCreated(stream->GetStreamId(), id);
 
   probe_ = PendingProbe{
       id,         clock_->ApproximateNow(), clock_->ApproximateNow() + timeout,
diff --git a/quiche/quic/moqt/moqt_probe_manager.h b/quiche/quic/moqt/moqt_probe_manager.h
index 0458a92..f2d8639 100644
--- a/quiche/quic/moqt/moqt_probe_manager.h
+++ b/quiche/quic/moqt/moqt_probe_manager.h
@@ -14,6 +14,7 @@
 #include "quiche/quic/core/quic_clock.h"
 #include "quiche/quic/core/quic_time.h"
 #include "quiche/quic/core/quic_types.h"
+#include "quiche/quic/moqt/moqt_trace_recorder.h"
 #include "quiche/common/quiche_callbacks.h"
 #include "quiche/web_transport/web_transport.h"
 
@@ -68,9 +69,11 @@
  public:
   explicit MoqtProbeManager(webtransport::Session* session,
                             const quic::QuicClock* clock,
-                            quic::QuicAlarmFactory& alarm_factory)
+                            quic::QuicAlarmFactory& alarm_factory,
+                            MoqtTraceRecorder* trace_recorder)
       : session_(session),
         clock_(clock),
+        trace_recorder_(trace_recorder),
         timeout_alarm_(alarm_factory.CreateAlarm(new AlarmDelegate(this))) {}
 
   // MoqtProbeManagerInterface implementation.
@@ -140,6 +143,7 @@
   std::optional<PendingProbe> probe_;
   webtransport::Session* session_;
   const quic::QuicClock* clock_;
+  MoqtTraceRecorder* trace_recorder_;
   std::unique_ptr<quic::QuicAlarm> timeout_alarm_;
   ProbeId next_probe_id_ = 0;
 };
diff --git a/quiche/quic/moqt/moqt_probe_manager_test.cc b/quiche/quic/moqt/moqt_probe_manager_test.cc
index 9e41f9b..d267886 100644
--- a/quiche/quic/moqt/moqt_probe_manager_test.cc
+++ b/quiche/quic/moqt/moqt_probe_manager_test.cc
@@ -15,6 +15,7 @@
 #include "absl/types/span.h"
 #include "quiche/quic/core/quic_time.h"
 #include "quiche/quic/core/quic_types.h"
+#include "quiche/quic/moqt/moqt_trace_recorder.h"
 #include "quiche/quic/test_tools/mock_clock.h"
 #include "quiche/quic/test_tools/quic_test_utils.h"
 #include "quiche/common/platform/api/quiche_logging.h"
@@ -74,10 +75,12 @@
 
 class MoqtProbeManagerTest : public quiche::test::QuicheTest {
  protected:
-  MoqtProbeManagerTest() : manager_(&session_, &clock_, alarm_factory_) {}
+  MoqtProbeManagerTest()
+      : manager_(&session_, &clock_, alarm_factory_, &trace_recorder_) {}
 
   webtransport::test::MockSession session_;
   quic::MockClock clock_;
+  MoqtTraceRecorder trace_recorder_;
   quic::test::MockAlarmFactory alarm_factory_;
   MoqtProbeManager manager_;
 };
diff --git a/quiche/quic/moqt/moqt_session.cc b/quiche/quic/moqt/moqt_session.cc
index f9b47a2..64ba595 100644
--- a/quiche/quic/moqt/moqt_session.cc
+++ b/quiche/quic/moqt/moqt_session.cc
@@ -936,6 +936,7 @@
   stream_->SetPriority(
       webtransport::StreamPriority{/*send_group_id=*/kMoqtSendGroupId,
                                    /*send_order=*/kMoqtControlStreamSendOrder});
+  session->trace_recorder_.RecordControlStreamCreated(stream->GetStreamId());
 }
 
 void MoqtSession::ControlStream::OnCanRead() {
@@ -2320,6 +2321,10 @@
       next_object_(parameters.first_object),
       session_liveness_(session->liveness_token_) {
   UpdateSendOrder(subscription);
+  if (subscription.track_alias().has_value()) {
+    session->trace_recorder_.RecordSubgroupStreamCreated(
+        stream->GetStreamId(), *subscription.track_alias(), parameters.index);
+  }
 }
 
 MoqtSession::OutgoingDataStream::~OutgoingDataStream() {
@@ -2574,4 +2579,13 @@
   delivery_timeout_alarm_->Set(deadline);
 }
 
+MoqtSession::PublishedFetch::FetchStreamVisitor::FetchStreamVisitor(
+    std::shared_ptr<PublishedFetch> fetch, webtransport::Stream* stream)
+    : fetch_(fetch), stream_(stream) {
+  fetch->fetch_task()->SetObjectAvailableCallback(
+      [this]() { this->OnCanWrite(); });
+  fetch->session()->trace_recorder_.RecordFetchStreamCreated(
+      stream->GetStreamId());
+}
+
 }  // namespace moqt
diff --git a/quiche/quic/moqt/moqt_session.h b/quiche/quic/moqt/moqt_session.h
index f3e812d..14c17ff 100644
--- a/quiche/quic/moqt/moqt_session.h
+++ b/quiche/quic/moqt/moqt_session.h
@@ -31,6 +31,7 @@
 #include "quiche/quic/moqt/moqt_session_callbacks.h"
 #include "quiche/quic/moqt/moqt_session_interface.h"
 #include "quiche/quic/moqt/moqt_subscribe_windows.h"
+#include "quiche/quic/moqt/moqt_trace_recorder.h"
 #include "quiche/quic/moqt/moqt_track.h"
 #include "quiche/quic/moqt/session_namespace_tree.h"
 #include "quiche/common/platform/api/quiche_export.h"
@@ -197,6 +198,8 @@
 
   void UseAlternateDeliveryTimeout() { alternate_delivery_timeout_ = true; }
 
+  MoqtTraceRecorder& trace_recorder() { return trace_recorder_; }
+
  private:
   friend class test::MoqtSessionPeer;
 
@@ -582,11 +585,7 @@
     class FetchStreamVisitor : public webtransport::StreamVisitor {
      public:
       FetchStreamVisitor(std::shared_ptr<PublishedFetch> fetch,
-                         webtransport::Stream* stream)
-          : fetch_(fetch), stream_(stream) {
-        fetch->fetch_task()->SetObjectAvailableCallback(
-            [this]() { this->OnCanWrite(); });
-      }
+                         webtransport::Stream* stream);
       ~FetchStreamVisitor() {
         std::shared_ptr<PublishedFetch> fetch = fetch_.lock();
         if (fetch != nullptr) {
@@ -877,6 +876,8 @@
   // the first object of group n+1 arrives.
   bool alternate_delivery_timeout_ = false;
 
+  MoqtTraceRecorder trace_recorder_;
+
   quiche::QuicheWeakPtrFactory<MoqtSessionInterface> weak_ptr_factory_;
 
   // Must be last.  Token used to make sure that the streams do not call into
diff --git a/quiche/quic/moqt/moqt_session_test.cc b/quiche/quic/moqt/moqt_session_test.cc
index 1f32f9d..9005ed1 100644
--- a/quiche/quic/moqt/moqt_session_test.cc
+++ b/quiche/quic/moqt/moqt_session_test.cc
@@ -265,7 +265,7 @@
         visitor = std::move(new_visitor);
       });
   EXPECT_CALL(mock_stream_, GetStreamId())
-      .WillOnce(Return(webtransport::StreamId(4)));
+      .WillRepeatedly(Return(webtransport::StreamId(4)));
   EXPECT_CALL(mock_session_, GetStreamById(4)).WillOnce(Return(&mock_stream_));
   EXPECT_CALL(mock_stream_, visitor()).WillOnce([&] { return visitor.get(); });
   EXPECT_CALL(mock_stream_,
@@ -1919,7 +1919,7 @@
         visitor = std::move(new_visitor);
       });
   EXPECT_CALL(mock_stream_, GetStreamId())
-      .WillOnce(Return(webtransport::StreamId(4)));
+      .WillRepeatedly(Return(webtransport::StreamId(4)));
   EXPECT_CALL(mock_session_, GetStreamById(4)).WillOnce(Return(&mock_stream_));
   EXPECT_CALL(mock_stream_, visitor()).WillOnce([&] { return visitor.get(); });
   EXPECT_CALL(mock_stream_,
diff --git a/quiche/quic/moqt/moqt_trace_recorder.cc b/quiche/quic/moqt/moqt_trace_recorder.cc
new file mode 100644
index 0000000..549d6ac
--- /dev/null
+++ b/quiche/quic/moqt/moqt_trace_recorder.cc
@@ -0,0 +1,58 @@
+// Copyright 2025 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#include "quiche/quic/moqt/moqt_trace_recorder.h"
+
+#include <cstdint>
+
+#include "quiche/quic/moqt/moqt_messages.h"
+#include "quic_trace/quic_trace.pb.h"
+#include "quiche/web_transport/web_transport.h"
+
+namespace moqt {
+
+void MoqtTraceRecorder::RecordControlStreamCreated(
+    webtransport::StreamId stream_id) {
+  if (trace_ == nullptr) {
+    return;
+  }
+  quic_trace::StreamAnnotation* annotation = trace_->add_stream_annotations();
+  annotation->set_stream_id(stream_id);
+  annotation->set_moqt_control_stream(true);
+}
+
+void MoqtTraceRecorder::RecordSubgroupStreamCreated(
+    webtransport::StreamId stream_id, uint64_t track_alias,
+    DataStreamIndex index) {
+  if (trace_ == nullptr) {
+    return;
+  }
+  quic_trace::StreamAnnotation* annotation = trace_->add_stream_annotations();
+  annotation->set_stream_id(stream_id);
+  annotation->mutable_moqt_subgroup_stream()->set_track_alias(track_alias);
+  annotation->mutable_moqt_subgroup_stream()->set_group_id(index.group);
+  annotation->mutable_moqt_subgroup_stream()->set_subgroup_id(index.subgroup);
+}
+
+void MoqtTraceRecorder::RecordFetchStreamCreated(
+    webtransport::StreamId stream_id) {
+  if (trace_ == nullptr) {
+    return;
+  }
+  quic_trace::StreamAnnotation* annotation = trace_->add_stream_annotations();
+  annotation->set_stream_id(stream_id);
+  annotation->mutable_moqt_fetch_stream();
+}
+
+void MoqtTraceRecorder::RecordProbeStreamCreated(
+    webtransport::StreamId stream_id, uint64_t probe_id) {
+  if (trace_ == nullptr) {
+    return;
+  }
+  quic_trace::StreamAnnotation* annotation = trace_->add_stream_annotations();
+  annotation->set_stream_id(stream_id);
+  annotation->mutable_moqt_probe_stream()->set_probe_id(probe_id);
+}
+
+}  // namespace moqt
diff --git a/quiche/quic/moqt/moqt_trace_recorder.h b/quiche/quic/moqt/moqt_trace_recorder.h
new file mode 100644
index 0000000..80c84bb
--- /dev/null
+++ b/quiche/quic/moqt/moqt_trace_recorder.h
@@ -0,0 +1,52 @@
+// Copyright 2025 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#ifndef QUICHE_QUIC_MOQT_MOQT_TRACE_RECORDER_H_
+#define QUICHE_QUIC_MOQT_MOQT_TRACE_RECORDER_H_
+
+#include <cstdint>
+
+#include "absl/base/nullability.h"
+#include "quiche/quic/moqt/moqt_messages.h"
+#include "quic_trace/quic_trace.pb.h"
+#include "quiche/web_transport/web_transport.h"
+
+namespace moqt {
+
+// Records MOQT-specific information into the provided QUIC trace proto.  The
+// wrapped trace can be nullptr, in which case no recording takes place.
+class MoqtTraceRecorder {
+ public:
+  MoqtTraceRecorder() : trace_(nullptr) {}
+  explicit MoqtTraceRecorder(quic_trace::Trace* absl_nullable trace)
+      : trace_(trace) {}
+
+  MoqtTraceRecorder(const MoqtTraceRecorder&) = delete;
+  MoqtTraceRecorder(MoqtTraceRecorder&&) = delete;
+  MoqtTraceRecorder& operator=(const MoqtTraceRecorder&) = delete;
+  MoqtTraceRecorder& operator=(MoqtTraceRecorder&&) = delete;
+
+  void set_trace(quic_trace::Trace* absl_nullable trace) { trace_ = trace; }
+
+  // Annotates the specified stream as the MOQT control stream.
+  void RecordControlStreamCreated(webtransport::StreamId stream_id);
+
+  // Annotates the specified stream as an MOQT subgroup data stream.
+  void RecordSubgroupStreamCreated(webtransport::StreamId stream_id,
+                                   uint64_t track_alias, DataStreamIndex index);
+
+  // Annotates the specified stream as an MOQT fetch data stream.
+  void RecordFetchStreamCreated(webtransport::StreamId stream_id);
+
+  // Annotates the specified stream as an MOQT probe stream.
+  void RecordProbeStreamCreated(webtransport::StreamId stream_id,
+                                uint64_t probe_id);
+
+ private:
+  quic_trace::Trace* absl_nullable trace_;
+};
+
+}  // namespace moqt
+
+#endif  // QUICHE_QUIC_MOQT_MOQT_TRACE_RECORDER_H_
diff --git a/quiche/quic/moqt/tools/moqt_simulator_bin.cc b/quiche/quic/moqt/tools/moqt_simulator_bin.cc
index 9bf70ec..059b59d 100644
--- a/quiche/quic/moqt/tools/moqt_simulator_bin.cc
+++ b/quiche/quic/moqt/tools/moqt_simulator_bin.cc
@@ -39,6 +39,7 @@
 #include "quiche/quic/moqt/moqt_priority.h"
 #include "quiche/quic/moqt/moqt_session.h"
 #include "quiche/quic/moqt/moqt_session_interface.h"
+#include "quiche/quic/moqt/moqt_trace_recorder.h"
 #include "quiche/quic/moqt/test_tools/moqt_simulator_harness.h"
 #include "quiche/quic/test_tools/simulator/actor.h"
 #include "quiche/quic/test_tools/simulator/link.h"
@@ -415,6 +416,9 @@
                                                timeout);
     }
     client_endpoint_.RecordTrace();
+    QUICHE_DCHECK(client_endpoint_.trace_visitor() != nullptr);
+    client_endpoint_.session()->trace_recorder().set_trace(
+        client_endpoint_.trace_visitor()->trace());
   }
 
   MoqtSession* client_session() { return client_endpoint_.session(); }
diff --git a/quiche/quic/test_tools/simulator/quic_endpoint_base.h b/quiche/quic/test_tools/simulator/quic_endpoint_base.h
index 336b2f9..797bfd6 100644
--- a/quiche/quic/test_tools/simulator/quic_endpoint_base.h
+++ b/quiche/quic/test_tools/simulator/quic_endpoint_base.h
@@ -7,6 +7,7 @@
 
 #include <memory>
 
+#include "absl/base/nullability.h"
 #include "absl/container/flat_hash_map.h"
 #include "quiche/quic/core/crypto/null_decrypter.h"
 #include "quiche/quic/core/crypto/null_encrypter.h"
@@ -69,6 +70,11 @@
   // Queue::ListenerInterface method.
   void OnPacketDequeued() override;
 
+  // Returns the trace visitor associated with the connection.
+  QuicTraceVisitor* absl_nullable trace_visitor() {
+    return trace_visitor_.get();
+  }
+
  protected:
   // A Writer object that writes into the |nic_tx_queue_|.
   class Writer : public QuicPacketWriter {