Add MoqtProbeManager

This is an auxiliary class responsible for sending and timing out probes.

PiperOrigin-RevId: 688186757
diff --git a/build/source_list.bzl b/build/source_list.bzl
index 5aeac68..4f150d5 100644
--- a/build/source_list.bzl
+++ b/build/source_list.bzl
@@ -1514,6 +1514,7 @@
     "quic/moqt/moqt_outgoing_queue.h",
     "quic/moqt/moqt_parser.h",
     "quic/moqt/moqt_priority.h",
+    "quic/moqt/moqt_probe_manager.h",
     "quic/moqt/moqt_publisher.h",
     "quic/moqt/moqt_session.h",
     "quic/moqt/moqt_subscribe_windows.h",
@@ -1545,6 +1546,8 @@
     "quic/moqt/moqt_parser_test.cc",
     "quic/moqt/moqt_priority.cc",
     "quic/moqt/moqt_priority_test.cc",
+    "quic/moqt/moqt_probe_manager.cc",
+    "quic/moqt/moqt_probe_manager_test.cc",
     "quic/moqt/moqt_session.cc",
     "quic/moqt/moqt_session_test.cc",
     "quic/moqt/moqt_subscribe_windows.cc",
diff --git a/build/source_list.gni b/build/source_list.gni
index ce85b0e..977a2eb 100644
--- a/build/source_list.gni
+++ b/build/source_list.gni
@@ -1518,6 +1518,7 @@
     "src/quiche/quic/moqt/moqt_outgoing_queue.h",
     "src/quiche/quic/moqt/moqt_parser.h",
     "src/quiche/quic/moqt/moqt_priority.h",
+    "src/quiche/quic/moqt/moqt_probe_manager.h",
     "src/quiche/quic/moqt/moqt_publisher.h",
     "src/quiche/quic/moqt/moqt_session.h",
     "src/quiche/quic/moqt/moqt_subscribe_windows.h",
@@ -1549,6 +1550,8 @@
     "src/quiche/quic/moqt/moqt_parser_test.cc",
     "src/quiche/quic/moqt/moqt_priority.cc",
     "src/quiche/quic/moqt/moqt_priority_test.cc",
+    "src/quiche/quic/moqt/moqt_probe_manager.cc",
+    "src/quiche/quic/moqt/moqt_probe_manager_test.cc",
     "src/quiche/quic/moqt/moqt_session.cc",
     "src/quiche/quic/moqt/moqt_session_test.cc",
     "src/quiche/quic/moqt/moqt_subscribe_windows.cc",
diff --git a/build/source_list.json b/build/source_list.json
index c10a0a9..0ebf8f2 100644
--- a/build/source_list.json
+++ b/build/source_list.json
@@ -1517,6 +1517,7 @@
     "quiche/quic/moqt/moqt_outgoing_queue.h",
     "quiche/quic/moqt/moqt_parser.h",
     "quiche/quic/moqt/moqt_priority.h",
+    "quiche/quic/moqt/moqt_probe_manager.h",
     "quiche/quic/moqt/moqt_publisher.h",
     "quiche/quic/moqt/moqt_session.h",
     "quiche/quic/moqt/moqt_subscribe_windows.h",
@@ -1548,6 +1549,8 @@
     "quiche/quic/moqt/moqt_parser_test.cc",
     "quiche/quic/moqt/moqt_priority.cc",
     "quiche/quic/moqt/moqt_priority_test.cc",
+    "quiche/quic/moqt/moqt_probe_manager.cc",
+    "quiche/quic/moqt/moqt_probe_manager_test.cc",
     "quiche/quic/moqt/moqt_session.cc",
     "quiche/quic/moqt/moqt_session_test.cc",
     "quiche/quic/moqt/moqt_subscribe_windows.cc",
diff --git a/quiche/quic/moqt/moqt_priority.cc b/quiche/quic/moqt/moqt_priority.cc
index 22a8f7a..b55546e 100644
--- a/quiche/quic/moqt/moqt_priority.cc
+++ b/quiche/quic/moqt/moqt_priority.cc
@@ -75,5 +75,7 @@
 
 const webtransport::SendOrder kMoqtControlStreamSendOrder =
     std::numeric_limits<webtransport::SendOrder>::max();
+const webtransport::SendOrder kMoqtProbeStreamSendOrder =
+    std::numeric_limits<webtransport::SendOrder>::min();
 
 }  // namespace moqt
diff --git a/quiche/quic/moqt/moqt_priority.h b/quiche/quic/moqt/moqt_priority.h
index 15f8a55..a16d7e0 100644
--- a/quiche/quic/moqt/moqt_priority.h
+++ b/quiche/quic/moqt/moqt_priority.h
@@ -39,6 +39,9 @@
 // WebTransport send order set on the MoQT control stream.
 QUICHE_EXPORT extern const webtransport::SendOrder kMoqtControlStreamSendOrder;
 
+// WebTransport send order set on MoQT bandwidth probe streams.
+QUICHE_EXPORT extern const webtransport::SendOrder kMoqtProbeStreamSendOrder;
+
 }  // namespace moqt
 
 #endif  // QUICHE_QUIC_MOQT_MOQT_PRIORITY_H_
diff --git a/quiche/quic/moqt/moqt_probe_manager.cc b/quiche/quic/moqt/moqt_probe_manager.cc
new file mode 100644
index 0000000..c3745b3
--- /dev/null
+++ b/quiche/quic/moqt/moqt_probe_manager.cc
@@ -0,0 +1,137 @@
+// Copyright 2024 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#include "quiche/quic/moqt/moqt_probe_manager.h"
+
+#include <algorithm>
+#include <memory>
+#include <optional>
+#include <utility>
+
+#include "absl/status/status.h"
+#include "absl/strings/string_view.h"
+#include "absl/types/span.h"
+#include "quiche/quic/core/quic_time.h"
+#include "quiche/quic/core/quic_types.h"
+#include "quiche/quic/moqt/moqt_messages.h"
+#include "quiche/quic/moqt/moqt_priority.h"
+#include "quiche/common/platform/api/quiche_bug_tracker.h"
+#include "quiche/common/platform/api/quiche_logging.h"
+#include "quiche/common/quiche_stream.h"
+#include "quiche/common/wire_serialization.h"
+#include "quiche/web_transport/web_transport.h"
+
+namespace moqt {
+
+namespace {
+constexpr quic::QuicByteCount kWriteChunkSize = 4096;
+constexpr char kZeroes[kWriteChunkSize] = {0};
+}  // namespace
+
+std::optional<ProbeId> MoqtProbeManager::StartProbe(
+    quic::QuicByteCount probe_size, quic::QuicTimeDelta timeout,
+    Callback callback) {
+  if (probe_.has_value()) {
+    return std::nullopt;
+  }
+
+  ProbeId id = next_probe_id_++;
+  webtransport::Stream* stream = session_->OpenOutgoingUnidirectionalStream();
+  if (stream == nullptr) {
+    return std::nullopt;
+  }
+
+  probe_ = PendingProbe{
+      id,         clock_->ApproximateNow(), clock_->ApproximateNow() + timeout,
+      probe_size, stream->GetStreamId(),    std::move(callback)};
+  auto visitor_owned =
+      std::make_unique<ProbeStreamVisitor>(this, stream, id, probe_size);
+  ProbeStreamVisitor* visitor = visitor_owned.get();
+  stream->SetVisitor(std::move(visitor_owned));
+  stream->SetPriority(webtransport::StreamPriority{
+      /*send_group_id=*/0, /*send_order=*/kMoqtProbeStreamSendOrder});
+  visitor->OnCanWrite();
+  RescheduleAlarm();
+  return id;
+}
+
+std::optional<ProbeId> MoqtProbeManager::StopProbe() {
+  if (!probe_.has_value()) {
+    return std::nullopt;
+  }
+  ProbeId id = probe_->id;
+  ClosePendingProbe(ProbeStatus::kAborted);
+  return id;
+}
+
+void MoqtProbeManager::ProbeStreamVisitor::OnCanWrite() {
+  if (!ValidateProbe() || !stream_->CanWrite()) {
+    return;
+  }
+
+  if (!header_sent_) {
+    absl::Status status = quiche::WriteIntoStream(
+        *stream_, *quiche::SerializeIntoString(
+                      quiche::WireVarInt62(MoqtDataStreamType::kPadding)));
+    QUICHE_DCHECK(status.ok()) << status;  // Should succeed if CanWrite().
+    header_sent_ = true;
+  }
+
+  while (stream_->CanWrite() && data_remaining_ > 0) {
+    quic::QuicByteCount chunk_size = std::min(kWriteChunkSize, data_remaining_);
+    absl::string_view chunk(kZeroes, chunk_size);
+    quiche::StreamWriteOptions options;
+    options.set_send_fin(chunk_size == data_remaining_);
+    absl::Status status = stream_->Writev(absl::MakeSpan(&chunk, 1), options);
+    QUICHE_DCHECK(status.ok()) << status;  // Should succeed if CanWrite().
+    data_remaining_ -= chunk_size;
+  }
+}
+
+void MoqtProbeManager::ProbeStreamVisitor::OnStopSendingReceived(
+    webtransport::StreamErrorCode error) {
+  if (!ValidateProbe()) {
+    return;
+  }
+  manager_->ClosePendingProbe(ProbeStatus::kAborted);
+}
+
+void MoqtProbeManager::ProbeStreamVisitor::OnWriteSideInDataRecvdState() {
+  if (!ValidateProbe()) {
+    return;
+  }
+  manager_->ClosePendingProbe(ProbeStatus::kSuccess);
+}
+
+void MoqtProbeManager::RescheduleAlarm() {
+  quic::QuicTime deadline =
+      probe_.has_value() ? probe_->deadline : quic::QuicTime::Zero();
+  timeout_alarm_->Update(deadline, quic::QuicTimeDelta::Zero());
+}
+
+void MoqtProbeManager::OnAlarm() {
+  if (probe_.has_value()) {
+    ClosePendingProbe(ProbeStatus::kTimeout);
+  }
+  RescheduleAlarm();
+}
+
+void MoqtProbeManager::ClosePendingProbe(ProbeStatus status) {
+  std::optional<PendingProbe> probe = std::move(probe_);
+  if (!probe.has_value()) {
+    QUICHE_BUG(ClosePendingProbe_no_probe);
+    return;
+  }
+  if (status != ProbeStatus::kSuccess) {
+    webtransport::Stream* stream = session_->GetStreamById(probe->stream_id);
+    if (stream != nullptr) {
+      // TODO: figure out the error code.
+      stream->ResetWithUserCode(0);
+    }
+  }
+  quic::QuicTime now = clock_->ApproximateNow();
+  std::move(probe->callback)(
+      ProbeResult{probe->id, status, probe->probe_size, now - probe->start});
+}
+}  // namespace moqt
diff --git a/quiche/quic/moqt/moqt_probe_manager.h b/quiche/quic/moqt/moqt_probe_manager.h
new file mode 100644
index 0000000..0458a92
--- /dev/null
+++ b/quiche/quic/moqt/moqt_probe_manager.h
@@ -0,0 +1,149 @@
+// Copyright 2024 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#ifndef QUICHE_QUIC_MOQT_MOQT_PROBE_MANAGER_H_
+#define QUICHE_QUIC_MOQT_MOQT_PROBE_MANAGER_H_
+
+#include <cstdint>
+#include <memory>
+#include <optional>
+
+#include "quiche/quic/core/quic_alarm.h"
+#include "quiche/quic/core/quic_alarm_factory.h"
+#include "quiche/quic/core/quic_clock.h"
+#include "quiche/quic/core/quic_time.h"
+#include "quiche/quic/core/quic_types.h"
+#include "quiche/common/quiche_callbacks.h"
+#include "quiche/web_transport/web_transport.h"
+
+namespace moqt {
+
+// ID of a probe.
+using ProbeId = uint64_t;
+
+// Potential outcomes of a proge.
+enum class ProbeStatus : uint8_t {
+  // Probe has finished successfully.
+  kSuccess,
+  // Probe has timed out.
+  kTimeout,
+  // Probe has been aborted, via a STOP_SENDING or for some other reason.
+  kAborted,
+};
+
+// Represents the results of a probe.
+struct ProbeResult {
+  ProbeId id;
+  ProbeStatus status;
+  // The number of bytes requested on the probe.
+  quic::QuicByteCount probe_size;
+  // Time elapsed between the time the probe was requested and now.
+  quic::QuicTimeDelta time_elapsed;
+};
+
+// Interface used to mock out MoqtProbeManager.
+class MoqtProbeManagerInterface {
+ public:
+  using Callback = quiche::SingleUseCallback<void(const ProbeResult& result)>;
+
+  virtual ~MoqtProbeManagerInterface() = default;
+
+  // Starts the probe.  Returns the ID of the probe, or nullopt if the probe
+  // cannot be started.  Will fail if a probe is already pending.
+  virtual std::optional<ProbeId> StartProbe(quic::QuicByteCount probe_size,
+                                            quic::QuicTimeDelta timeout,
+                                            Callback callback) = 0;
+  // Cancels the currently pending probe.
+  virtual std::optional<ProbeId> StopProbe() = 0;
+};
+
+namespace test {
+class MoqtProbeManagerPeer;
+}
+
+// MoqtProbeManager keeps track of the pending bandwidth probe, including
+// ensuring there is only one probe pending, and handling the timeout.
+class MoqtProbeManager : public MoqtProbeManagerInterface {
+ public:
+  explicit MoqtProbeManager(webtransport::Session* session,
+                            const quic::QuicClock* clock,
+                            quic::QuicAlarmFactory& alarm_factory)
+      : session_(session),
+        clock_(clock),
+        timeout_alarm_(alarm_factory.CreateAlarm(new AlarmDelegate(this))) {}
+
+  // MoqtProbeManagerInterface implementation.
+  std::optional<ProbeId> StartProbe(quic::QuicByteCount probe_size,
+                                    quic::QuicTimeDelta timeout,
+                                    Callback callback) override;
+  std::optional<ProbeId> StopProbe() override;
+
+ private:
+  friend class ::moqt::test::MoqtProbeManagerPeer;
+
+  struct PendingProbe {
+    ProbeId id;
+    quic::QuicTime start;
+    quic::QuicTime deadline;
+    quic::QuicByteCount probe_size;
+    webtransport::StreamId stream_id;
+    Callback callback;
+  };
+
+  class ProbeStreamVisitor : public webtransport::StreamVisitor {
+   public:
+    ProbeStreamVisitor(MoqtProbeManager* manager, webtransport::Stream* stream,
+                       ProbeId probe_id, quic::QuicByteCount probe_size)
+        : manager_(manager),
+          stream_(stream),
+          probe_id_(probe_id),
+          data_remaining_(probe_size) {}
+
+    void OnCanRead() override {}
+    void OnCanWrite() override;
+    void OnResetStreamReceived(webtransport::StreamErrorCode error) override {}
+    void OnStopSendingReceived(webtransport::StreamErrorCode error) override;
+    void OnWriteSideInDataRecvdState() override;
+
+   private:
+    // Ensures the stream is associated with the currently active probe.
+    bool ValidateProbe() {
+      if (!manager_->probe_.has_value() || manager_->probe_->id != probe_id_) {
+        // TODO: figure out the error code.
+        stream_->ResetWithUserCode(0);
+        return false;
+      }
+      return true;
+    }
+
+    MoqtProbeManager* manager_;
+    webtransport::Stream* stream_;
+    ProbeId probe_id_;
+    bool header_sent_ = false;
+    quic::QuicByteCount data_remaining_;
+  };
+
+  class AlarmDelegate : public quic::QuicAlarm::DelegateWithoutContext {
+   public:
+    explicit AlarmDelegate(MoqtProbeManager* manager) : manager_(manager) {}
+    void OnAlarm() override { manager_->OnAlarm(); }
+
+   private:
+    MoqtProbeManager* manager_;
+  };
+
+  void RescheduleAlarm();
+  void OnAlarm();
+  void ClosePendingProbe(ProbeStatus status);
+
+  std::optional<PendingProbe> probe_;
+  webtransport::Session* session_;
+  const quic::QuicClock* clock_;
+  std::unique_ptr<quic::QuicAlarm> timeout_alarm_;
+  ProbeId next_probe_id_ = 0;
+};
+
+}  // namespace moqt
+
+#endif  // QUICHE_QUIC_MOQT_MOQT_PROBE_MANAGER_H_
diff --git a/quiche/quic/moqt/moqt_probe_manager_test.cc b/quiche/quic/moqt/moqt_probe_manager_test.cc
new file mode 100644
index 0000000..a60cfa2
--- /dev/null
+++ b/quiche/quic/moqt/moqt_probe_manager_test.cc
@@ -0,0 +1,234 @@
+// Copyright 2024 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#include "quiche/quic/moqt/moqt_probe_manager.h"
+
+#include <cstddef>
+#include <memory>
+#include <optional>
+#include <string>
+#include <utility>
+
+#include "absl/status/status.h"
+#include "absl/strings/string_view.h"
+#include "absl/types/span.h"
+#include "quiche/quic/core/quic_time.h"
+#include "quiche/quic/core/quic_types.h"
+#include "quiche/quic/test_tools/mock_clock.h"
+#include "quiche/quic/test_tools/quic_test_utils.h"
+#include "quiche/common/platform/api/quiche_logging.h"
+#include "quiche/common/platform/api/quiche_test.h"
+#include "quiche/common/quiche_stream.h"
+#include "quiche/web_transport/test_tools/mock_web_transport.h"
+#include "quiche/web_transport/web_transport.h"
+
+namespace moqt::test {
+
+using TestAlarm = quic::test::MockAlarmFactory::TestAlarm;
+
+class MoqtProbeManagerPeer {
+ public:
+  static TestAlarm* GetAlarm(MoqtProbeManager& manager) {
+    return static_cast<TestAlarm*>(manager.timeout_alarm_.get());
+  }
+};
+
+namespace {
+
+using ::testing::_;
+using ::testing::Return;
+
+// Two-byte varint.
+constexpr size_t kProbeStreamHeaderSize = 2;
+
+class MockStream : public webtransport::test::MockStream {
+ public:
+  MockStream(webtransport::StreamId id) : id_(id) {}
+
+  webtransport::StreamId GetStreamId() const override { return id_; }
+  absl::Status Writev(absl::Span<const absl::string_view> data,
+                      const quiche::StreamWriteOptions& options) override {
+    QUICHE_CHECK(!fin_) << "FIN written twice.";
+    for (absl::string_view chunk : data) {
+      data_.append(chunk);
+    }
+    fin_ = options.send_fin();
+    return absl::OkStatus();
+  }
+  void SetVisitor(std::unique_ptr<webtransport::StreamVisitor> visitor) {
+    visitor_ = std::move(visitor);
+  }
+  webtransport::StreamVisitor* visitor() override { return visitor_.get(); }
+
+  absl::string_view data() const { return data_; }
+  bool fin() const { return fin_; }
+
+ private:
+  webtransport::StreamId id_;
+  std::unique_ptr<webtransport::StreamVisitor> visitor_ = nullptr;
+  std::string data_;
+  bool fin_ = false;
+};
+
+class MoqtProbeManagerTest : public quiche::test::QuicheTest {
+ protected:
+  MoqtProbeManagerTest() : manager_(&session_, &clock_, alarm_factory_) {}
+
+  webtransport::test::MockSession session_;
+  quic::MockClock clock_;
+  quic::test::MockAlarmFactory alarm_factory_;
+  MoqtProbeManager manager_;
+};
+
+TEST_F(MoqtProbeManagerTest, AddProbe) {
+  constexpr webtransport::StreamId kStreamId = 17;
+  constexpr quic::QuicByteCount kProbeSize = 8192 + 1;
+  constexpr quic::QuicTimeDelta kProbeDuration =
+      quic::QuicTimeDelta::FromMilliseconds(100);
+
+  MockStream stream(kStreamId);
+  EXPECT_CALL(session_, OpenOutgoingUnidirectionalStream())
+      .WillOnce(Return(&stream));
+  EXPECT_CALL(stream, CanWrite()).WillRepeatedly(Return(true));
+  std::optional<ProbeResult> result;
+  std::optional<ProbeId> probe_id =
+      manager_.StartProbe(kProbeSize, 3 * kProbeDuration,
+                          [&](const ProbeResult& r) { result = r; });
+  ASSERT_NE(probe_id, std::nullopt);
+  ASSERT_EQ(result, std::nullopt);
+
+  EXPECT_TRUE(stream.fin());
+  EXPECT_EQ(stream.data().size(), kProbeSize + kProbeStreamHeaderSize);
+
+  clock_.AdvanceTime(kProbeDuration);
+  stream.visitor()->OnWriteSideInDataRecvdState();
+
+  ASSERT_NE(result, std::nullopt);
+  EXPECT_EQ(result->id, probe_id);
+  EXPECT_EQ(result->status, ProbeStatus::kSuccess);
+  EXPECT_EQ(result->probe_size, kProbeSize);
+  EXPECT_EQ(result->time_elapsed, kProbeDuration);
+}
+
+TEST_F(MoqtProbeManagerTest, AddProbeWriteBlockedInTheMiddle) {
+  constexpr webtransport::StreamId kStreamId = 17;
+  constexpr quic::QuicByteCount kProbeSize = 8192 + 1;
+  constexpr quic::QuicTimeDelta kProbeDuration =
+      quic::QuicTimeDelta::FromMilliseconds(100);
+
+  MockStream stream(kStreamId);
+  EXPECT_CALL(session_, OpenOutgoingUnidirectionalStream())
+      .WillOnce(Return(&stream));
+  EXPECT_CALL(stream, CanWrite())
+      .WillOnce(Return(true))
+      .WillOnce(Return(true))
+      .WillOnce(Return(false));
+  std::optional<ProbeId> probe_id = manager_.StartProbe(
+      kProbeSize, 3 * kProbeDuration, [&](const ProbeResult& r) {});
+  ASSERT_NE(probe_id, std::nullopt);
+
+  EXPECT_FALSE(stream.fin());
+  EXPECT_LT(stream.data().size(), kProbeSize);
+
+  EXPECT_CALL(stream, CanWrite()).WillRepeatedly(Return(true));
+  stream.visitor()->OnCanWrite();
+  EXPECT_TRUE(stream.fin());
+  EXPECT_EQ(stream.data().size(), kProbeSize + kProbeStreamHeaderSize);
+}
+
+TEST_F(MoqtProbeManagerTest, ProbeCancelledByPeer) {
+  constexpr webtransport::StreamId kStreamId = 17;
+  constexpr quic::QuicByteCount kProbeSize = 8192 + 1;
+  constexpr quic::QuicTimeDelta kProbeDuration =
+      quic::QuicTimeDelta::FromMilliseconds(100);
+
+  MockStream stream(kStreamId);
+  EXPECT_CALL(session_, OpenOutgoingUnidirectionalStream())
+      .WillOnce(Return(&stream));
+  EXPECT_CALL(stream, CanWrite()).WillRepeatedly(Return(true));
+  std::optional<ProbeResult> result;
+  std::optional<ProbeId> probe_id =
+      manager_.StartProbe(kProbeSize, 3 * kProbeDuration,
+                          [&](const ProbeResult& r) { result = r; });
+  ASSERT_NE(probe_id, std::nullopt);
+  ASSERT_EQ(result, std::nullopt);
+
+  EXPECT_TRUE(stream.fin());
+  EXPECT_EQ(stream.data().size(), kProbeSize + kProbeStreamHeaderSize);
+
+  clock_.AdvanceTime(kProbeDuration * 0.5);
+  stream.visitor()->OnStopSendingReceived(/*error=*/0);
+
+  ASSERT_NE(result, std::nullopt);
+  EXPECT_EQ(result->id, probe_id);
+  EXPECT_EQ(result->status, ProbeStatus::kAborted);
+  EXPECT_EQ(result->time_elapsed, kProbeDuration * 0.5);
+}
+
+TEST_F(MoqtProbeManagerTest, ProbeCancelledByClient) {
+  constexpr webtransport::StreamId kStreamId = 17;
+  constexpr quic::QuicByteCount kProbeSize = 8192 + 1;
+  constexpr quic::QuicTimeDelta kProbeDuration =
+      quic::QuicTimeDelta::FromMilliseconds(100);
+
+  MockStream stream(kStreamId);
+  EXPECT_CALL(session_, OpenOutgoingUnidirectionalStream())
+      .WillOnce(Return(&stream));
+  EXPECT_CALL(stream, CanWrite()).WillRepeatedly(Return(true));
+  std::optional<ProbeResult> result;
+  std::optional<ProbeId> probe_id =
+      manager_.StartProbe(kProbeSize, 3 * kProbeDuration,
+                          [&](const ProbeResult& r) { result = r; });
+  ASSERT_NE(probe_id, std::nullopt);
+  ASSERT_EQ(result, std::nullopt);
+
+  EXPECT_TRUE(stream.fin());
+  EXPECT_EQ(stream.data().size(), kProbeSize + kProbeStreamHeaderSize);
+
+  EXPECT_CALL(session_, GetStreamById(kStreamId)).WillOnce(Return(&stream));
+  EXPECT_CALL(stream, ResetWithUserCode(_));
+  clock_.AdvanceTime(kProbeDuration * 0.5);
+  manager_.StopProbe();
+  ASSERT_NE(result, std::nullopt);
+  EXPECT_EQ(result->id, probe_id);
+  EXPECT_EQ(result->status, ProbeStatus::kAborted);
+  EXPECT_EQ(result->time_elapsed, kProbeDuration * 0.5);
+}
+
+TEST_F(MoqtProbeManagerTest, Timeout) {
+  constexpr webtransport::StreamId kStreamId = 17;
+  constexpr quic::QuicByteCount kProbeSize = 8192 + 1;
+  constexpr quic::QuicTimeDelta kProbeDuration =
+      quic::QuicTimeDelta::FromMilliseconds(100);
+  const quic::QuicTimeDelta kTimeout = 0.5 * kProbeDuration;
+
+  MockStream stream(kStreamId);
+  EXPECT_CALL(session_, OpenOutgoingUnidirectionalStream())
+      .WillOnce(Return(&stream));
+  EXPECT_CALL(stream, CanWrite()).WillRepeatedly(Return(true));
+  std::optional<ProbeResult> result;
+  std::optional<ProbeId> probe_id = manager_.StartProbe(
+      kProbeSize, kTimeout, [&](const ProbeResult& r) { result = r; });
+  ASSERT_NE(probe_id, std::nullopt);
+  ASSERT_EQ(result, std::nullopt);
+
+  EXPECT_TRUE(stream.fin());
+  EXPECT_EQ(stream.data().size(), kProbeSize + kProbeStreamHeaderSize);
+
+  clock_.AdvanceTime(kTimeout);
+  TestAlarm* alarm = MoqtProbeManagerPeer::GetAlarm(manager_);
+  EXPECT_EQ(alarm->deadline(), clock_.Now());
+
+  EXPECT_CALL(session_, GetStreamById(kStreamId)).WillOnce(Return(&stream));
+  EXPECT_CALL(stream, ResetWithUserCode(_));
+  alarm->Fire();
+  ASSERT_NE(result, std::nullopt);
+  EXPECT_EQ(result->id, probe_id);
+  EXPECT_EQ(result->status, ProbeStatus::kTimeout);
+  EXPECT_EQ(result->probe_size, kProbeSize);
+  EXPECT_EQ(result->time_elapsed, kTimeout);
+}
+
+}  // namespace
+}  // namespace moqt::test