Update MoQT Stream errors to draft-11. PiperOrigin-RevId: 781991311
diff --git a/quiche/quic/moqt/moqt_messages.cc b/quiche/quic/moqt/moqt_messages.cc index 3cb9923..75dcf0f 100644 --- a/quiche/quic/moqt/moqt_messages.cc +++ b/quiche/quic/moqt/moqt_messages.cc
@@ -376,10 +376,12 @@ absl::Status MoqtStreamErrorToStatus(webtransport::StreamErrorCode error_code, absl::string_view reason_phrase) { switch (error_code) { - case kResetCodeSubscriptionGone: - return absl::NotFoundError(reason_phrase); - case kResetCodeTimedOut: + case kResetCodeCancelled: + return absl::CancelledError(reason_phrase); + case kResetCodeDeliveryTimeout: return absl::DeadlineExceededError(reason_phrase); + case kResetCodeSessionClosed: + return absl::AbortedError(reason_phrase); default: return absl::UnknownError(reason_phrase); }
diff --git a/quiche/quic/moqt/moqt_messages.h b/quiche/quic/moqt/moqt_messages.h index 2e6960d..7c1b609 100644 --- a/quiche/quic/moqt/moqt_messages.h +++ b/quiche/quic/moqt/moqt_messages.h
@@ -161,12 +161,10 @@ }; // Error codes used by MoQT to reset streams. -// TODO: update with spec-defined error codes once those are available, see -// <https://github.com/moq-wg/moq-transport/issues/481>. inline constexpr webtransport::StreamErrorCode kResetCodeUnknown = 0x00; -inline constexpr webtransport::StreamErrorCode kResetCodeSubscriptionGone = - 0x01; -inline constexpr webtransport::StreamErrorCode kResetCodeTimedOut = 0x02; +inline constexpr webtransport::StreamErrorCode kResetCodeCancelled = 0x01; +inline constexpr webtransport::StreamErrorCode kResetCodeDeliveryTimeout = 0x02; +inline constexpr webtransport::StreamErrorCode kResetCodeSessionClosed = 0x03; enum class QUICHE_EXPORT SetupParameter : uint64_t { kPath = 0x1,
diff --git a/quiche/quic/moqt/moqt_session.cc b/quiche/quic/moqt/moqt_session.cc index 2a202ed..464b800 100644 --- a/quiche/quic/moqt/moqt_session.cc +++ b/quiche/quic/moqt/moqt_session.cc
@@ -694,7 +694,7 @@ if (stream == nullptr) { continue; } - stream->ResetWithUserCode(kResetCodeSubscriptionGone); + stream->ResetWithUserCode(kResetCodeCancelled); } return true; } @@ -1696,7 +1696,7 @@ ? session_->RemoteTrackById(message.track_alias) : session_->RemoteTrackByAlias(message.track_alias); if (track == nullptr) { - stream_->SendStopSending(kResetCodeSubscriptionGone); + stream_->SendStopSending(kResetCodeCancelled); // Received object for nonexistent track. return; } @@ -1731,7 +1731,7 @@ UpstreamFetch::UpstreamFetchTask* task = fetch->task(); if (task == nullptr) { // The application killed the FETCH. - stream_->SendStopSending(kResetCodeSubscriptionGone); + stream_->SendStopSending(kResetCodeCancelled); return; } if (!task->HasObject()) { @@ -1824,7 +1824,7 @@ << "Received object for a track with no SUBSCRIBE"; // This is a not a session error because there might be an UNSUBSCRIBE in // flight. - stream_->SendStopSending(kResetCodeSubscriptionGone); + stream_->SendStopSending(kResetCodeCancelled); return; } it->second->OnStreamOpened(); @@ -1835,7 +1835,7 @@ QUIC_DLOG(INFO) << ENDPOINT << "Received object for a track with no FETCH"; // This is a not a session error because there might be an UNSUBSCRIBE in // flight. - stream_->SendStopSending(kResetCodeSubscriptionGone); + stream_->SendStopSending(kResetCodeCancelled); return; } if (it->second == nullptr) { @@ -2112,7 +2112,7 @@ if (raw_stream == nullptr) { continue; } - raw_stream->ResetWithUserCode(kResetCodeTimedOut); + raw_stream->ResetWithUserCode(kResetCodeDeliveryTimeout); } first_active_group_ = std::max(first_active_group_, group_id + 1); absl::erase_if(reset_subgroups_, [&](const DataStreamIndex& index) { @@ -2260,14 +2260,14 @@ if (it != stream_->session_->published_subscriptions_.end()) { it->second->OnStreamTimeout(stream_->index()); } - stream_->stream_->ResetWithUserCode(kResetCodeTimedOut); + stream_->stream_->ResetWithUserCode(kResetCodeDeliveryTimeout); } MoqtSession::PublishedSubscription* MoqtSession::OutgoingDataStream::GetSubscriptionIfValid() { auto it = session_->published_subscriptions_.find(subscription_id_); if (it == session_->published_subscriptions_.end()) { - stream_->ResetWithUserCode(kResetCodeSubscriptionGone); + stream_->ResetWithUserCode(kResetCodeCancelled); return nullptr; } @@ -2318,7 +2318,7 @@ object->metadata.arrival_time > delivery_timeout) { subscription.OnStreamTimeout(index_); - stream_->ResetWithUserCode(kResetCodeTimedOut); + stream_->ResetWithUserCode(kResetCodeDeliveryTimeout); return; }
diff --git a/quiche/quic/moqt/moqt_session_test.cc b/quiche/quic/moqt/moqt_session_test.cc index 32400fc..b269517 100644 --- a/quiche/quic/moqt/moqt_session_test.cc +++ b/quiche/quic/moqt/moqt_session_test.cc
@@ -1420,7 +1420,7 @@ EXPECT_TRUE(correct_message); EXPECT_TRUE(fin); - EXPECT_CALL(mock_stream_, ResetWithUserCode(kResetCodeTimedOut)); + EXPECT_CALL(mock_stream_, ResetWithUserCode(kResetCodeDeliveryTimeout)); subscription->OnGroupAbandoned(5); } @@ -2966,7 +2966,7 @@ quic::QuicTimeDelta::FromSeconds(1), }, quiche::QuicheMemSlice(), false})); - EXPECT_CALL(data_mock, ResetWithUserCode(kResetCodeTimedOut)) + EXPECT_CALL(data_mock, ResetWithUserCode(kResetCodeDeliveryTimeout)) .WillOnce(Invoke([&](webtransport::StreamErrorCode /*error*/) { stream_visitor.reset(); })); @@ -3024,11 +3024,11 @@ quiche::QuicheMemSlice(), true})) .WillOnce(Return(std::nullopt)); EXPECT_CALL(data_mock, Writev(_, _)).WillOnce(Return(absl::OkStatus())); - EXPECT_CALL(data_mock, ResetWithUserCode(kResetCodeTimedOut)).Times(0); + EXPECT_CALL(data_mock, ResetWithUserCode(kResetCodeDeliveryTimeout)).Times(0); subscription->OnNewObjectAvailable(Location(0, 0), 0); auto* delivery_alarm = static_cast<quic::test::MockAlarmFactory::TestAlarm*>( MoqtSessionPeer::GetAlarm(stream_visitor.get())); - EXPECT_CALL(data_mock, ResetWithUserCode(kResetCodeTimedOut)) + EXPECT_CALL(data_mock, ResetWithUserCode(kResetCodeDeliveryTimeout)) .WillOnce(Invoke([&](webtransport::StreamErrorCode /*error*/) { stream_visitor.reset(); })); @@ -3081,7 +3081,7 @@ subscription->OnNewFinAvailable(Location(0, 0), 0); auto* delivery_alarm = static_cast<quic::test::MockAlarmFactory::TestAlarm*>( MoqtSessionPeer::GetAlarm(stream_visitor.get())); - EXPECT_CALL(data_mock, ResetWithUserCode(kResetCodeTimedOut)) + EXPECT_CALL(data_mock, ResetWithUserCode(kResetCodeDeliveryTimeout)) .WillOnce(Invoke([&](webtransport::StreamErrorCode /*error*/) { stream_visitor.reset(); })); @@ -3161,7 +3161,7 @@ // Group 1 should start the timer on the Group 0 stream. auto* delivery_alarm = static_cast<quic::test::MockAlarmFactory::TestAlarm*>( MoqtSessionPeer::GetAlarm(stream_visitor1.get())); - EXPECT_CALL(data_mock1, ResetWithUserCode(kResetCodeTimedOut)) + EXPECT_CALL(data_mock1, ResetWithUserCode(kResetCodeDeliveryTimeout)) .WillOnce(Invoke([&](webtransport::StreamErrorCode /*error*/) { stream_visitor1.reset(); }));