Clean up minor issues from MOQT draft-16.

- Duplicate SUBSCRIBE is now a Request Error, not session error

- Always Reset streams as publisher when a subscribe is done (now including when there is UNSUBSCRIBE).

- Do not enforce that request_id increments by 2 (due to an interop bug in draft-16, but apply other checks).

- Update error codes

Some features, like NEW_GROUP_REQUEST, have bugs filed for them.

FETCH stream serialization still to come.

PiperOrigin-RevId: 872095758
diff --git a/quiche/quic/moqt/moqt_error.cc b/quiche/quic/moqt/moqt_error.cc
index 97dbae5..4e46e5f 100644
--- a/quiche/quic/moqt/moqt_error.cc
+++ b/quiche/quic/moqt/moqt_error.cc
@@ -21,7 +21,7 @@
     case absl::StatusCode::kUnimplemented:
       return RequestErrorCode::kNotSupported;
     case absl::StatusCode::kNotFound:
-      return RequestErrorCode::kTrackDoesNotExist;
+      return RequestErrorCode::kDoesNotExist;
     case absl::StatusCode::kOutOfRange:
       return RequestErrorCode::kInvalidRange;
     case absl::StatusCode::kInvalidArgument:
@@ -43,15 +43,10 @@
       return absl::StatusCode::kDeadlineExceeded;
     case RequestErrorCode::kNotSupported:
       return absl::StatusCode::kUnimplemented;
-    case RequestErrorCode::kTrackDoesNotExist:
-      // Equivalently, kUninterested and kNamespacePrefixUnknown.
+    case RequestErrorCode::kDoesNotExist:
       return absl::StatusCode::kNotFound;
     case RequestErrorCode::kInvalidRange:
-      // Equivalently, kNamespacePrefixOverlap.
       return absl::StatusCode::kOutOfRange;
-    case RequestErrorCode::kNoObjects:
-      // Equivalently, kRetryTrackAlias.
-      return absl::StatusCode::kNotFound;
     case RequestErrorCode::kInvalidJoiningRequestId:
     case RequestErrorCode::kMalformedAuthToken:
       return absl::StatusCode::kInvalidArgument;
@@ -70,12 +65,18 @@
 absl::Status MoqtStreamErrorToStatus(webtransport::StreamErrorCode error_code,
                                      absl::string_view reason_phrase) {
   switch (error_code) {
-    case kResetCodeCanceled:
+    case kResetCodeInternalError:
+      return absl::InternalError(reason_phrase);
+    case kResetCodeCancelled:
       return absl::CancelledError(reason_phrase);
     case kResetCodeDeliveryTimeout:
       return absl::DeadlineExceededError(reason_phrase);
     case kResetCodeSessionClosed:
       return absl::AbortedError(reason_phrase);
+    case kResetCodeUnknownObjectStatus:
+      return absl::FailedPreconditionError(reason_phrase);
+    case kResetCodeTooFarBehind:
+      return absl::DeadlineExceededError(reason_phrase);
     case kResetCodeMalformedTrack:
       return absl::InvalidArgumentError(reason_phrase);
     default:
diff --git a/quiche/quic/moqt/moqt_error.h b/quiche/quic/moqt/moqt_error.h
index 085af18..7542832 100644
--- a/quiche/quic/moqt/moqt_error.h
+++ b/quiche/quic/moqt/moqt_error.h
@@ -43,13 +43,14 @@
 };
 
 // Error codes used by MoQT to reset streams.
-inline constexpr webtransport::StreamErrorCode kResetCodeUnknown = 0x00;
-inline constexpr webtransport::StreamErrorCode kResetCodeCanceled = 0x01;
+inline constexpr webtransport::StreamErrorCode kResetCodeInternalError = 0x00;
+inline constexpr webtransport::StreamErrorCode kResetCodeCancelled = 0x01;
 inline constexpr webtransport::StreamErrorCode kResetCodeDeliveryTimeout = 0x02;
 inline constexpr webtransport::StreamErrorCode kResetCodeSessionClosed = 0x03;
-// TODO(martinduke): This is not in the spec, but is needed. The number might
-// change.
-inline constexpr webtransport::StreamErrorCode kResetCodeMalformedTrack = 0x04;
+inline constexpr webtransport::StreamErrorCode kResetCodeUnknownObjectStatus =
+    0x04;
+inline constexpr webtransport::StreamErrorCode kResetCodeMalformedTrack = 0x12;
+// Proposed in a PR post draft-16.
 inline constexpr webtransport::StreamErrorCode kResetCodeTooFarBehind = 0x05;
 
 // Used for SUBSCRIBE_ERROR, PUBLISH_NAMESPACE_ERROR, PUBLISH_NAMESPACE_CANCEL,
@@ -59,20 +60,28 @@
   kUnauthorized = 0x1,
   kTimeout = 0x2,
   kNotSupported = 0x3,
-  kTrackDoesNotExist = 0x4,  // SUBSCRIBE_ERROR and FETCH_ERROR only.
-  kUninterested =
-      0x4,  // PUBLISH_NAMESPACE_ERROR and PUBLISH_NAMESPACE_CANCEL only.
-  kNamespacePrefixUnknown = 0x4,   // SUBSCRIBE_NAMESPACE_ERROR only.
-  kInvalidRange = 0x5,             // SUBSCRIBE_ERROR and FETCH_ERROR only.
-  kNamespacePrefixOverlap = 0x5,   // SUBSCRIBE_NAMESPACE_ERROR only.
-  kNoObjects = 0x6,                // FETCH_ERROR only.
-  kInvalidJoiningRequestId = 0x7,  // FETCH_ERROR only.
-  kUnknownStatusInRange = 0x8,     // FETCH_ERROR only.
-  kMalformedTrack = 0x9,
-  kMalformedAuthToken = 0x10,
-  kExpiredAuthToken = 0x12,
+  kMalformedAuthToken = 0x4,
+  kExpiredAuthToken = 0x5,
+  kDoesNotExist = 0x10,
+  kInvalidRange = 0x11,
+  kMalformedTrack = 0x12,
   kDuplicateSubscription = 0x19,
+  kUninterested = 0x20,
+  kNamespacePrefixUnknown = 0x21,
   kPrefixOverlap = 0x30,
+  kInvalidJoiningRequestId = 0x32,
+};
+
+enum class QUICHE_EXPORT PublishDoneCode : uint64_t {
+  kInternalError = 0x0,
+  kUnauthorized = 0x1,
+  kTrackEnded = 0x2,
+  kSubscriptionEnded = 0x3,
+  kGoingAway = 0x4,
+  kExpired = 0x5,
+  kTooFarBehind = 0x6,
+  kUpdateFailed = 0x8,
+  kMalformedTrack = 0x12,
 };
 
 struct MoqtRequestErrorInfo {
diff --git a/quiche/quic/moqt/moqt_messages.h b/quiche/quic/moqt/moqt_messages.h
index a05b7d7..fdb35e5 100644
--- a/quiche/quic/moqt/moqt_messages.h
+++ b/quiche/quic/moqt/moqt_messages.h
@@ -368,17 +368,6 @@
   uint64_t request_id;
 };
 
-enum class QUICHE_EXPORT PublishDoneCode : uint64_t {
-  kInternalError = 0x0,
-  kUnauthorized = 0x1,
-  kTrackEnded = 0x2,
-  kSubscriptionEnded = 0x3,
-  kGoingAway = 0x4,
-  kExpired = 0x5,
-  kTooFarBehind = 0x6,
-  kMalformedTrack = 0x7,
-};
-
 struct QUICHE_EXPORT MoqtPublishDone {
   uint64_t request_id;
   PublishDoneCode status_code;
diff --git a/quiche/quic/moqt/moqt_namespace_stream.cc b/quiche/quic/moqt/moqt_namespace_stream.cc
index 206d04c..3ac34a3 100644
--- a/quiche/quic/moqt/moqt_namespace_stream.cc
+++ b/quiche/quic/moqt/moqt_namespace_stream.cc
@@ -165,7 +165,7 @@
 
 MoqtNamespaceSubscriberStream::NamespaceTask::~NamespaceTask() {
   if (state_ != nullptr) {
-    state_->Reset(kResetCodeCanceled);
+    state_->Reset(kResetCodeCancelled);
   }
 }
 
@@ -336,7 +336,7 @@
         };
         return;
       case kError:
-        Reset(kResetCodeCanceled);
+        Reset(kResetCodeCancelled);
         return;
       case kSuccess: {
         switch (type) {
diff --git a/quiche/quic/moqt/moqt_relay_track_publisher.cc b/quiche/quic/moqt/moqt_relay_track_publisher.cc
index 05df4a7..1b44c8d 100644
--- a/quiche/quic/moqt/moqt_relay_track_publisher.cc
+++ b/quiche/quic/moqt/moqt_relay_track_publisher.cc
@@ -221,7 +221,7 @@
   for (auto& [group, group_data] : queue_) {
     for (auto& [subgroup, subgroup_data] : group_data.subgroups) {
       for (MoqtObjectListener* listener : listeners_) {
-        listener->OnSubgroupAbandoned(group, subgroup, kResetCodeCanceled);
+        listener->OnSubgroupAbandoned(group, subgroup, kResetCodeCancelled);
       }
     }
   }
@@ -265,7 +265,7 @@
   }
   for (MoqtObjectListener* listener : listeners_) {
     listener->OnSubgroupAbandoned(stream.group, stream.subgroup,
-                                  kResetCodeCanceled);
+                                  kResetCodeCancelled);
   }
 }
 
diff --git a/quiche/quic/moqt/moqt_relay_track_publisher_test.cc b/quiche/quic/moqt/moqt_relay_track_publisher_test.cc
index 37075b0..fc1cb5a 100644
--- a/quiche/quic/moqt/moqt_relay_track_publisher_test.cc
+++ b/quiche/quic/moqt/moqt_relay_track_publisher_test.cc
@@ -446,7 +446,7 @@
 TEST_F(MoqtRelayTrackPublisherTest, Reset) {
   SubscribeAndOk();
 
-  EXPECT_CALL(listener_, OnSubgroupAbandoned(2, 0, kResetCodeCanceled));
+  EXPECT_CALL(listener_, OnSubgroupAbandoned(2, 0, kResetCodeCancelled));
   publisher_.OnStreamReset(kTrackName, DataStreamIndex{2, 0});
 }
 
diff --git a/quiche/quic/moqt/moqt_session.cc b/quiche/quic/moqt/moqt_session.cc
index 441b52e..fff6197 100644
--- a/quiche/quic/moqt/moqt_session.cc
+++ b/quiche/quic/moqt/moqt_session.cc
@@ -108,8 +108,6 @@
   }
   if (parameters_.perspective == Perspective::IS_SERVER) {
     next_request_id_ = 1;
-  } else {
-    next_incoming_request_id_ = 1;
   }
   QUICHE_DCHECK(parameters_.moqt_implementation.empty());
   parameters_.moqt_implementation = kImplementationName;
@@ -712,26 +710,19 @@
   }
 
   PublishedSubscription& subscription = *it->second;
-  std::vector<webtransport::StreamId> streams_to_reset =
-      subscription.GetAllStreams();
 
   MoqtPublishDone publish_done;
   publish_done.request_id = request_id;
   publish_done.status_code = code;
   publish_done.stream_count = subscription.streams_opened();
   publish_done.error_reason = error_reason;
-  SendControlMessage(framer_.SerializePublishDone(publish_done));
-  QUIC_DLOG(INFO) << ENDPOINT << "Sent PUBLISH_DONE message for "
+  // TODO(martinduke): It is technically correct, but not good, to simply
+  // reset all the streams in order to send PUBLISH_DONE. It's better to wait
+  // until streams FIN naturally, where possible.
+  QUIC_DLOG(INFO) << ENDPOINT << "Sending PUBLISH_DONE message for "
                   << subscription.publisher().GetTrackName();
-  // Clean up the subscription
   published_subscriptions_.erase(it);
-  for (webtransport::StreamId stream_id : streams_to_reset) {
-    webtransport::Stream* stream = session_->GetStreamById(stream_id);
-    if (stream == nullptr) {
-      continue;
-    }
-    stream->ResetWithUserCode(kResetCodeCanceled);
-  }
+  SendControlMessage(framer_.SerializePublishDone(publish_done));
   return true;
 }
 
@@ -904,20 +895,27 @@
     Error(MoqtError::kTooManyRequests, "Received request with too large ID");
     return false;
   }
-  if (request_id != next_incoming_request_id_) {
-    QUICHE_DLOG(INFO) << ENDPOINT << "Request ID not monotonically increasing";
-    Error(MoqtError::kInvalidRequestId,
-          "Request ID not monotonically increasing");
+  if ((request_id % 2 == 0) !=
+      (parameters_.perspective == Perspective::IS_SERVER)) {
+    QUICHE_DLOG(INFO) << ENDPOINT << "Request ID evenness incorrect";
+    Error(MoqtError::kInvalidRequestId, "Request ID evenness incorrect");
     return false;
   }
-  next_incoming_request_id_ = request_id + 2;
+  if (published_subscriptions_.contains(request_id) ||
+      incoming_fetches_.contains(request_id) ||
+      incoming_track_status_.contains(request_id) ||
+      incoming_publish_namespaces_by_id_.contains(request_id)) {
+    QUICHE_DLOG(INFO) << ENDPOINT << "Duplicate request ID";
+    Error(MoqtError::kInvalidRequestId, "Duplicate request ID");
+    return false;
+  }
   return true;
 }
 
 void MoqtSession::UnknownBidiStream::OnCanRead() {
   if (!parser_.ReadUntilMessageTypeKnown()) {
     // Got an early FIN.
-    stream_->ResetWithUserCode(kResetCodeCanceled);
+    stream_->ResetWithUserCode(kResetCodeCancelled);
     return;
   }
   if (!parser_.message_type().has_value()) {
@@ -1030,8 +1028,9 @@
     return;
   }
   if (session_->subscribed_track_names_.contains(message.full_track_name)) {
-    session_->Error(MoqtError::kProtocolViolation,
-                    "Duplicate subscribe for track");
+    SendRequestError(message.request_id,
+                     RequestErrorCode::kDuplicateSubscription, std::nullopt,
+                     "");
     return;
   }
   const FullTrackName& track_name = message.full_track_name;
@@ -1040,7 +1039,7 @@
   if (track_publisher == nullptr) {
     QUIC_DLOG(INFO) << ENDPOINT << "SUBSCRIBE for " << track_name
                     << " rejected by the application: does not exist";
-    SendRequestError(message.request_id, RequestErrorCode::kTrackDoesNotExist,
+    SendRequestError(message.request_id, RequestErrorCode::kDoesNotExist,
                      std::nullopt, "not found");
     return;
   }
@@ -1137,9 +1136,9 @@
   if (ru_it != session_->pending_subscribe_updates_.end()) {
     auto sub_it = session_->subscribe_by_name_.find(ru_it->second.name);
     if (sub_it == session_->subscribe_by_name_.end()) {
-      std::move(ru_it->second.response_callback)(MoqtRequestErrorInfo{
-          RequestErrorCode::kTrackDoesNotExist, std::nullopt,
-          "subscription does not exist anymore"});
+      std::move(ru_it->second.response_callback)(
+          MoqtRequestErrorInfo{RequestErrorCode::kDoesNotExist, std::nullopt,
+                               "subscription does not exist anymore"});
       session_->pending_subscribe_updates_.erase(ru_it);
       return;
     }
@@ -1391,7 +1390,7 @@
   std::shared_ptr<MoqtTrackPublisher> track =
       session_->publisher_->GetTrack(message.full_track_name);
   if (track == nullptr) {
-    SendRequestError(message.request_id, RequestErrorCode::kTrackDoesNotExist,
+    SendRequestError(message.request_id, RequestErrorCode::kDoesNotExist,
                      std::nullopt, "Track does not exist");
     return;
   }
@@ -1454,7 +1453,7 @@
     if (track_publisher == nullptr) {
       QUIC_DLOG(INFO) << ENDPOINT << "FETCH for " << track_name
                       << " rejected by the application: not found";
-      SendRequestError(message.request_id, RequestErrorCode::kTrackDoesNotExist,
+      SendRequestError(message.request_id, RequestErrorCode::kDoesNotExist,
                        std::nullopt, "not found");
       return;
     }
@@ -1650,7 +1649,7 @@
                 ? session_->RemoteTrackById(message.track_alias)
                 : session_->RemoteTrackByAlias(message.track_alias);
     if (track == nullptr) {
-      stream_->SendStopSending(kResetCodeCanceled);
+      stream_->SendStopSending(kResetCodeCancelled);
       // Received object for nonexistent track.
       return;
     }
@@ -1712,7 +1711,7 @@
     UpstreamFetch::UpstreamFetchTask* task = fetch->task();
     if (task == nullptr) {
       // The application killed the FETCH.
-      stream_->SendStopSending(kResetCodeCanceled);
+      stream_->SendStopSending(kResetCodeCancelled);
       return;
     }
     if (!task->HasObject()) {
@@ -1811,7 +1810,7 @@
                         << "Received object for a track with no SUBSCRIBE";
         // This is a not a session error because there might be an UNSUBSCRIBE
         // or SUBSCRIBE_OK (containing the track alias) in flight.
-        stream_->SendStopSending(kResetCodeCanceled);
+        stream_->SendStopSending(kResetCodeCancelled);
         return;
       }
       it->second->OnStreamOpened();
@@ -1826,7 +1825,7 @@
     QUIC_DLOG(INFO) << ENDPOINT << "Received object for a track with no FETCH";
     // This is a not a session error because there might be an UNSUBSCRIBE in
     // flight.
-    stream_->SendStopSending(kResetCodeCanceled);
+    stream_->SendStopSending(kResetCodeCancelled);
     return;
   }
   if (it->second == nullptr) {
@@ -1880,8 +1879,18 @@
 }
 
 MoqtSession::PublishedSubscription::~PublishedSubscription() {
-  session_->subscribed_track_names_.erase(track_publisher_->GetTrackName());
   track_publisher_->RemoveObjectListener(this);
+  if (session_->is_closing_) {
+    return;
+  }
+  session_->subscribed_track_names_.erase(track_publisher_->GetTrackName());
+  // Reset all streams.
+  for (const webtransport::StreamId stream_id : stream_map().GetAllStreams()) {
+    webtransport::Stream* stream = session_->session_->GetStreamById(stream_id);
+    if (stream != nullptr) {
+      stream->ResetWithUserCode(kResetCodeCancelled);
+    }
+  }
 }
 
 SendStreamMap& MoqtSession::PublishedSubscription::stream_map() {
@@ -2287,7 +2296,7 @@
 MoqtSession::OutgoingDataStream::GetSubscriptionIfValid() {
   auto it = session_->published_subscriptions_.find(subscription_id_);
   if (it == session_->published_subscriptions_.end()) {
-    stream_->ResetWithUserCode(kResetCodeCanceled);
+    stream_->ResetWithUserCode(kResetCodeCancelled);
     return nullptr;
   }
 
diff --git a/quiche/quic/moqt/moqt_session.h b/quiche/quic/moqt/moqt_session.h
index b134910..d7289dd 100644
--- a/quiche/quic/moqt/moqt_session.h
+++ b/quiche/quic/moqt/moqt_session.h
@@ -677,9 +677,9 @@
     void OnGroupAbandoned(uint64_t /*group_id*/) override {}
     void OnTrackPublisherGone() override {
       publisher_ = nullptr;
-      OnSubscribeRejected(
-          MoqtRequestErrorInfo(RequestErrorCode::kTrackDoesNotExist,
-                               std::nullopt, "Track publisher gone"));
+      OnSubscribeRejected(MoqtRequestErrorInfo(RequestErrorCode::kDoesNotExist,
+                                               std::nullopt,
+                                               "Track publisher gone"));
     }
 
    private:
@@ -871,8 +871,6 @@
   SessionNamespaceTree incoming_subscribe_namespace_;
   SessionNamespaceTree outgoing_subscribe_namespace_;
 
-  // The minimum request ID the peer can use that is monotonically increasing.
-  uint64_t next_incoming_request_id_ = 0;
   // The maximum request ID sent to the peer. Peer-generated IDs must be less
   // than this value.
   uint64_t local_max_request_id_ = 0;
diff --git a/quiche/quic/moqt/moqt_session_test.cc b/quiche/quic/moqt/moqt_session_test.cc
index e77fdcd..4f0feb7 100644
--- a/quiche/quic/moqt/moqt_session_test.cc
+++ b/quiche/quic/moqt/moqt_session_test.cc
@@ -649,10 +649,8 @@
 
   request.request_id = 3;
   request.parameters.subscription_filter.emplace(Location(12, 0));
-  EXPECT_CALL(mock_session_,
-              CloseSession(static_cast<uint64_t>(MoqtError::kProtocolViolation),
-                           "Duplicate subscribe for track"))
-      .Times(1);
+  EXPECT_CALL(mock_stream_,
+              Writev(ControlMessageOfType(MoqtMessageType::kRequestError), _));
   stream_input->OnSubscribeMessage(request);
 }
 
@@ -698,16 +696,15 @@
   MoqtSubscribe request = DefaultSubscribe();
   std::unique_ptr<MoqtControlParserVisitor> stream_input =
       MoqtSessionPeer::CreateControlStream(&session_, &mock_stream_);
-  // Request for track returns REQUEST_ERROR.
-  EXPECT_CALL(mock_stream_,
-              Writev(ControlMessageOfType(MoqtMessageType::kRequestError), _));
+  MockTrackPublisher* track = CreateTrackPublisher();
+  EXPECT_CALL(*track, AddObjectListener);
   stream_input->OnSubscribeMessage(request);
 
   // Second request is a protocol violation.
   request.full_track_name = FullTrackName({"dead", "beef"});
   EXPECT_CALL(mock_session_,
               CloseSession(static_cast<uint64_t>(MoqtError::kInvalidRequestId),
-                           "Request ID not monotonically increasing"));
+                           "Duplicate request ID"));
   stream_input->OnSubscribeMessage(request);
 }
 
@@ -893,8 +890,6 @@
   session_.GrantMoreRequests(1);
   // Peer subscribes to (0, 0)
   MoqtSubscribe request = DefaultSubscribe();
-  MoqtSessionPeer::set_next_incoming_request_id(
-      &session_, kDefaultInitialMaxRequestId + 1);
   request.request_id = kDefaultInitialMaxRequestId + 1;
   MockTrackPublisher* track = CreateTrackPublisher();
   ReceiveSubscribeSynchronousOk(track, request, stream_input.get());
@@ -1464,7 +1459,7 @@
       /*stream_count=*/1,
       /*error_reason=*/"",
   };
-  EXPECT_CALL(mock_stream_, ResetWithUserCode(kResetCodeCanceled));
+  EXPECT_CALL(mock_stream_, ResetWithUserCode(kResetCodeCancelled));
   webtransport::test::MockStream control_stream;
   std::unique_ptr<MoqtControlParserVisitor> stream_input =
       MoqtSessionPeer::CreateControlStream(&session_, &control_stream);
@@ -1532,7 +1527,7 @@
       /*stream_count=*/1,
       /*error_reason=*/"",
   };
-  EXPECT_CALL(mock_stream_, ResetWithUserCode(kResetCodeCanceled));
+  EXPECT_CALL(mock_stream_, ResetWithUserCode(kResetCodeCancelled));
   webtransport::test::MockStream control_stream;
   std::unique_ptr<MoqtControlParserVisitor> stream_input =
       MoqtSessionPeer::CreateControlStream(&session_, &control_stream);
@@ -2606,25 +2601,25 @@
                    MoqtFetchTask::GetNextObjectResult::kPending);
   stream_input->OnFetchMessage(fetch);
 
-  MoqtRequestError expected_error{fetch.request_id,
-                                  RequestErrorCode::kTrackDoesNotExist,
-                                  std::nullopt, "foo"};
+  MoqtRequestError expected_error{
+      fetch.request_id, RequestErrorCode::kDoesNotExist, std::nullopt, "foo"};
   EXPECT_CALL(mock_stream_,
               Writev(SerializedControlMessage(expected_error), _));
   fetch_task->CallFetchResponseCallback(expected_error);
 }
 
 TEST_F(MoqtSessionTest, InvalidFetch) {
-  // Update the state so that it expects ID > 0 next time.
-  MoqtSessionPeer::ValidateRequestId(&session_, 1);
   webtransport::test::MockStream control_stream;
   std::unique_ptr<MoqtControlParserVisitor> stream_input =
       MoqtSessionPeer::CreateControlStream(&session_, &control_stream);
+  MockTrackPublisher* track = CreateTrackPublisher();
   MoqtFetch fetch = DefaultFetch();
-  fetch.request_id = 1;  // Too low.
+  EXPECT_CALL(*track, StandaloneFetch)
+      .WillOnce(Return(std::make_unique<MockFetchTask>()));
+  stream_input->OnFetchMessage(fetch);
   EXPECT_CALL(mock_session_,
               CloseSession(static_cast<uint64_t>(MoqtError::kInvalidRequestId),
-                           "Request ID not monotonically increasing"))
+                           "Duplicate request ID"))
       .Times(1);
   stream_input->OnFetchMessage(fetch);
 }
@@ -4078,7 +4073,7 @@
   DeliverObject(object, /*fin=*/false, mock_session_, &mock_stream_,
                 data_stream, &remote_track_visitor_);
   // The data stream died and destroyed the visitor (IncomingDataStream).
-  data_stream->OnResetStreamReceived(kResetCodeCanceled);
+  data_stream->OnResetStreamReceived(kResetCodeCancelled);
   EXPECT_CALL(remote_track_visitor_, OnStreamReset(FullTrackName("foo", "bar"),
                                                    DataStreamIndex(0, 0)));
   data_stream.reset();
diff --git a/quiche/quic/moqt/moqt_track.cc b/quiche/quic/moqt/moqt_track.cc
index 25bf9b4..6b5b050 100644
--- a/quiche/quic/moqt/moqt_track.cc
+++ b/quiche/quic/moqt/moqt_track.cc
@@ -16,6 +16,7 @@
 #include "quiche/quic/core/quic_alarm.h"
 #include "quiche/quic/core/quic_clock.h"
 #include "quiche/quic/core/quic_time.h"
+#include "quiche/quic/moqt/moqt_error.h"
 #include "quiche/quic/moqt/moqt_fetch_task.h"
 #include "quiche/quic/moqt/moqt_key_value_pair.h"
 #include "quiche/quic/moqt/moqt_messages.h"
@@ -128,10 +129,11 @@
 }
 
 UpstreamFetch::~UpstreamFetch() {
-  if (task_.IsValid()) {
+  UpstreamFetchTask* task = task_.GetIfAvailable();
+  if (task != nullptr) {
     // Notify the task (which the application owns) that nothing more is coming.
     // If this has already been called, UpstreamFetchTask will ignore it.
-    task_.GetIfAvailable()->OnStreamAndFetchClosed(kResetCodeUnknown, "");
+    task->OnStreamAndFetchClosed(kResetCodeCancelled, "");
   }
 }
 
diff --git a/quiche/quic/moqt/test_tools/moqt_session_peer.h b/quiche/quic/moqt/test_tools/moqt_session_peer.h
index 403a346..6b0e117 100644
--- a/quiche/quic/moqt/test_tools/moqt_session_peer.h
+++ b/quiche/quic/moqt/test_tools/moqt_session_peer.h
@@ -153,10 +153,6 @@
     session->next_request_id_ = id;
   }
 
-  static void set_next_incoming_request_id(MoqtSession* session, uint64_t id) {
-    session->next_incoming_request_id_ = id;
-  }
-
   static void set_peer_max_request_id(MoqtSession* session, uint64_t id) {
     session->peer_max_request_id_ = id;
   }
diff --git a/quiche/quic/moqt/test_tools/moqt_test_message.h b/quiche/quic/moqt/test_tools/moqt_test_message.h
index ec708bf..ccf040e 100644
--- a/quiche/quic/moqt/test_tools/moqt_test_message.h
+++ b/quiche/quic/moqt/test_tools/moqt_test_message.h
@@ -785,7 +785,7 @@
   uint8_t raw_packet_[11] = {
       0x05, 0x00, 0x08,
       0x02,                    // request_id = 2
-      0x05,                    // error_code = 5
+      0x11,                    // error_code = 17
       0x67, 0x11,              // retry_interval = 10000 ms
       0x03, 0x62, 0x61, 0x72,  // reason_phrase = "bar"
   };
diff --git a/quiche/quic/moqt/tools/chat_client.cc b/quiche/quic/moqt/tools/chat_client.cc
index a82406c..c408333 100644
--- a/quiche/quic/moqt/tools/chat_client.cc
+++ b/quiche/quic/moqt/tools/chat_client.cc
@@ -76,7 +76,7 @@
     std::cout << "PUBLISH_NAMESPACE rejected, invalid namespace\n";
     if (callback != nullptr) {
       std::move(callback)(std::make_optional<MoqtRequestErrorInfo>(
-          RequestErrorCode::kTrackDoesNotExist, std::nullopt,
+          RequestErrorCode::kDoesNotExist, std::nullopt,
           "Not a subscribed namespace"));
     }
     return;