Split MoqtSession::Stream into ControlStream, IncomingDataStream, and OutgoingDataStream.

This simplifies logic a bit, since we don't need to manually check stream type in every stream. It also should make easier adding backpressure handling for data streams in future CLs.

We might get further simplification by splitting MoqtParser into two (one for control stream, one for data), but that's out of scope for this CL.

PiperOrigin-RevId: 651457331
diff --git a/quiche/quic/moqt/moqt_session.cc b/quiche/quic/moqt/moqt_session.cc
index 6832f25..023bdcc 100644
--- a/quiche/quic/moqt/moqt_session.cc
+++ b/quiche/quic/moqt/moqt_session.cc
@@ -41,7 +41,7 @@
 
 using ::quic::Perspective;
 
-MoqtSession::Stream* MoqtSession::GetControlStream() {
+MoqtSession::ControlStream* MoqtSession::GetControlStream() {
   if (!control_stream_.has_value()) {
     return nullptr;
   }
@@ -49,11 +49,11 @@
   if (raw_stream == nullptr) {
     return nullptr;
   }
-  return static_cast<Stream*>(raw_stream->visitor());
+  return static_cast<ControlStream*>(raw_stream->visitor());
 }
 
 void MoqtSession::SendControlMessage(quiche::QuicheBuffer message) {
-  Stream* control_stream = GetControlStream();
+  ControlStream* control_stream = GetControlStream();
   if (control_stream == nullptr) {
     QUICHE_LOG(DFATAL) << "Trying to send a message on the control stream "
                           "while it does not exist";
@@ -74,8 +74,8 @@
     Error(MoqtError::kInternalError, "Unable to open a control stream");
     return;
   }
-  control_stream->SetVisitor(std::make_unique<Stream>(
-      this, control_stream, /*is_control_stream=*/true));
+  control_stream->SetVisitor(
+      std::make_unique<ControlStream>(this, control_stream));
   control_stream_ = control_stream->GetStreamId();
   MoqtClientSetup setup = MoqtClientSetup{
       .supported_versions = std::vector<MoqtVersion>{parameters_.version},
@@ -107,14 +107,14 @@
       Error(MoqtError::kProtocolViolation, "Bidirectional stream already open");
       return;
     }
-    stream->SetVisitor(std::make_unique<Stream>(this, stream));
+    stream->SetVisitor(std::make_unique<ControlStream>(this, stream));
     stream->visitor()->OnCanRead();
   }
 }
 void MoqtSession::OnIncomingUnidirectionalStreamAvailable() {
   while (webtransport::Stream* stream =
              session_->AcceptIncomingUnidirectionalStream()) {
-    stream->SetVisitor(std::make_unique<Stream>(this, stream));
+    stream->SetVisitor(std::make_unique<IncomingDataStream>(this, stream));
     stream->visitor()->OnCanRead();
   }
 }
@@ -376,7 +376,8 @@
   if (new_stream == nullptr) {
     return std::nullopt;
   }
-  new_stream->SetVisitor(std::make_unique<Stream>(this, new_stream, false));
+  new_stream->SetVisitor(
+      std::make_unique<OutgoingDataStream>(this, new_stream));
   return new_stream->GetStreamId();
 }
 
@@ -572,82 +573,45 @@
   }
 }
 
-void MoqtSession::Stream::OnCanRead() {
+static void ForwardStreamDataToParser(webtransport::Stream& stream,
+                                      MoqtParser& parser) {
   bool fin =
-      quiche::ProcessAllReadableRegions(*stream_, [&](absl::string_view chunk) {
-        parser_.ProcessData(chunk, /*end_of_stream=*/false);
+      quiche::ProcessAllReadableRegions(stream, [&](absl::string_view chunk) {
+        parser.ProcessData(chunk, /*end_of_stream=*/false);
       });
   if (fin) {
-    parser_.ProcessData("", /*end_of_stream=*/true);
-  }
-}
-void MoqtSession::Stream::OnCanWrite() {}
-void MoqtSession::Stream::OnResetStreamReceived(
-    webtransport::StreamErrorCode error) {
-  if (is_control_stream_.has_value() && *is_control_stream_) {
-    session_->Error(
-        MoqtError::kProtocolViolation,
-        absl::StrCat("Control stream reset with error code ", error));
-  }
-}
-void MoqtSession::Stream::OnStopSendingReceived(
-    webtransport::StreamErrorCode error) {
-  if (is_control_stream_.has_value() && *is_control_stream_) {
-    session_->Error(
-        MoqtError::kProtocolViolation,
-        absl::StrCat("Control stream reset with error code ", error));
+    parser.ProcessData("", /*end_of_stream=*/true);
   }
 }
 
-void MoqtSession::Stream::OnObjectMessage(const MoqtObject& message,
-                                          absl::string_view payload,
-                                          bool end_of_message) {
-  if (is_control_stream_ == true) {
-    session_->Error(MoqtError::kProtocolViolation,
-                    "Received OBJECT message on control stream");
-    return;
-  }
-  QUICHE_DVLOG(1)
-      << ENDPOINT << "Received OBJECT message on stream "
-      << stream_->GetStreamId() << " for subscribe_id " << message.subscribe_id
-      << " for track alias " << message.track_alias << " with sequence "
-      << message.group_id << ":" << message.object_id << " send_order "
-      << message.object_send_order << " forwarding_preference "
-      << MoqtForwardingPreferenceToString(message.forwarding_preference)
-      << " length " << payload.size() << " explicit length "
-      << (message.payload_length.has_value() ? (int)*message.payload_length
-                                             : -1)
-      << (end_of_message ? "F" : "");
-  if (!session_->parameters_.deliver_partial_objects) {
-    if (!end_of_message) {  // Buffer partial object.
-      absl::StrAppend(&partial_object_, payload);
-      return;
-    }
-    if (!partial_object_.empty()) {  // Completes the object
-      absl::StrAppend(&partial_object_, payload);
-      payload = absl::string_view(partial_object_);
-    }
-  }
-  auto [full_track_name, visitor] = session_->TrackPropertiesFromAlias(message);
-  if (visitor != nullptr) {
-    visitor->OnObjectFragment(
-        full_track_name, message.group_id, message.object_id,
-        message.object_send_order, message.object_status,
-        message.forwarding_preference, payload, end_of_message);
-  }
-  partial_object_.clear();
+void MoqtSession::ControlStream::OnCanRead() {
+  ForwardStreamDataToParser(*stream_, parser_);
+}
+void MoqtSession::ControlStream::OnCanWrite() {
+  // We buffer serialized control frames unconditionally, thus OnCanWrite()
+  // requires no handling for control streams.
 }
 
-void MoqtSession::Stream::OnClientSetupMessage(const MoqtClientSetup& message) {
-  if (is_control_stream_.has_value()) {
-    if (!*is_control_stream_) {
-      session_->Error(MoqtError::kProtocolViolation,
-                      "Received SETUP on non-control stream");
-      return;
-    }
-  } else {
-    is_control_stream_ = true;
-  }
+void MoqtSession::ControlStream::OnResetStreamReceived(
+    webtransport::StreamErrorCode error) {
+  session_->Error(MoqtError::kProtocolViolation,
+                  absl::StrCat("Control stream reset with error code ", error));
+}
+void MoqtSession::ControlStream::OnStopSendingReceived(
+    webtransport::StreamErrorCode error) {
+  session_->Error(MoqtError::kProtocolViolation,
+                  absl::StrCat("Control stream reset with error code ", error));
+}
+
+void MoqtSession::ControlStream::OnObjectMessage(const MoqtObject& message,
+                                                 absl::string_view payload,
+                                                 bool end_of_message) {
+  session_->Error(MoqtError::kProtocolViolation,
+                  "Received OBJECT message on control stream");
+}
+
+void MoqtSession::ControlStream::OnClientSetupMessage(
+    const MoqtClientSetup& message) {
   session_->control_stream_ = stream_->GetStreamId();
   if (perspective() == Perspective::IS_CLIENT) {
     session_->Error(MoqtError::kProtocolViolation,
@@ -675,16 +639,8 @@
   session_->peer_role_ = *message.role;
 }
 
-void MoqtSession::Stream::OnServerSetupMessage(const MoqtServerSetup& message) {
-  if (is_control_stream_.has_value()) {
-    if (!*is_control_stream_) {
-      session_->Error(MoqtError::kProtocolViolation,
-                      "Received SETUP on non-control stream");
-      return;
-    }
-  } else {
-    is_control_stream_ = true;
-  }
+void MoqtSession::ControlStream::OnServerSetupMessage(
+    const MoqtServerSetup& message) {
   if (perspective() == Perspective::IS_SERVER) {
     session_->Error(MoqtError::kProtocolViolation,
                     "Received SERVER_SETUP from client");
@@ -703,10 +659,9 @@
   session_->peer_role_ = *message.role;
 }
 
-void MoqtSession::Stream::SendSubscribeError(const MoqtSubscribe& message,
-                                             SubscribeErrorCode error_code,
-                                             absl::string_view reason_phrase,
-                                             uint64_t track_alias) {
+void MoqtSession::ControlStream::SendSubscribeError(
+    const MoqtSubscribe& message, SubscribeErrorCode error_code,
+    absl::string_view reason_phrase, uint64_t track_alias) {
   MoqtSubscribeError subscribe_error;
   subscribe_error.subscribe_id = message.subscribe_id;
   subscribe_error.error_code = error_code;
@@ -716,11 +671,9 @@
       session_->framer_.SerializeSubscribeError(subscribe_error));
 }
 
-void MoqtSession::Stream::OnSubscribeMessage(const MoqtSubscribe& message) {
+void MoqtSession::ControlStream::OnSubscribeMessage(
+    const MoqtSubscribe& message) {
   std::string reason_phrase = "";
-  if (!CheckIfIsControlStream()) {
-    return;
-  }
   if (session_->peer_role_ == MoqtRole::kPublisher) {
     QUIC_DLOG(INFO) << ENDPOINT << "Publisher peer sent SUBSCRIBE";
     session_->Error(MoqtError::kProtocolViolation,
@@ -825,10 +778,8 @@
   }
 }
 
-void MoqtSession::Stream::OnSubscribeOkMessage(const MoqtSubscribeOk& message) {
-  if (!CheckIfIsControlStream()) {
-    return;
-  }
+void MoqtSession::ControlStream::OnSubscribeOkMessage(
+    const MoqtSubscribeOk& message) {
   auto it = session_->active_subscribes_.find(message.subscribe_id);
   if (it == session_->active_subscribes_.end()) {
     session_->Error(MoqtError::kProtocolViolation,
@@ -860,11 +811,8 @@
   session_->active_subscribes_.erase(it);
 }
 
-void MoqtSession::Stream::OnSubscribeErrorMessage(
+void MoqtSession::ControlStream::OnSubscribeErrorMessage(
     const MoqtSubscribeError& message) {
-  if (!CheckIfIsControlStream()) {
-    return;
-  }
   auto it = session_->active_subscribes_.find(message.subscribe_id);
   if (it == session_->active_subscribes_.end()) {
     session_->Error(MoqtError::kProtocolViolation,
@@ -894,12 +842,13 @@
   session_->active_subscribes_.erase(it);
 }
 
-void MoqtSession::Stream::OnUnsubscribeMessage(const MoqtUnsubscribe& message) {
+void MoqtSession::ControlStream::OnUnsubscribeMessage(
+    const MoqtUnsubscribe& message) {
   session_->SubscribeIsDone(message.subscribe_id,
                             SubscribeDoneCode::kUnsubscribed, "");
 }
 
-void MoqtSession::Stream::OnSubscribeUpdateMessage(
+void MoqtSession::ControlStream::OnSubscribeUpdateMessage(
     const MoqtSubscribeUpdate& message) {
   // Search all the tracks to find the subscribe ID.
   auto name_it =
@@ -934,16 +883,14 @@
   }
 }
 
-void MoqtSession::Stream::OnAnnounceMessage(const MoqtAnnounce& message) {
+void MoqtSession::ControlStream::OnAnnounceMessage(
+    const MoqtAnnounce& message) {
   if (session_->peer_role_ == MoqtRole::kSubscriber) {
     QUIC_DLOG(INFO) << ENDPOINT << "Subscriber peer sent SUBSCRIBE";
     session_->Error(MoqtError::kProtocolViolation,
                     "Received ANNOUNCE from Subscriber");
     return;
   }
-  if (!CheckIfIsControlStream()) {
-    return;
-  }
   std::optional<MoqtAnnounceErrorReason> error =
       session_->callbacks_.incoming_announce_callback(message.track_namespace);
   if (error.has_value()) {
@@ -959,10 +906,8 @@
   SendOrBufferMessage(session_->framer_.SerializeAnnounceOk(ok));
 }
 
-void MoqtSession::Stream::OnAnnounceOkMessage(const MoqtAnnounceOk& message) {
-  if (!CheckIfIsControlStream()) {
-    return;
-  }
+void MoqtSession::ControlStream::OnAnnounceOkMessage(
+    const MoqtAnnounceOk& message) {
   auto it = session_->pending_outgoing_announces_.find(message.track_namespace);
   if (it == session_->pending_outgoing_announces_.end()) {
     session_->Error(MoqtError::kProtocolViolation,
@@ -973,11 +918,8 @@
   session_->pending_outgoing_announces_.erase(it);
 }
 
-void MoqtSession::Stream::OnAnnounceErrorMessage(
+void MoqtSession::ControlStream::OnAnnounceErrorMessage(
     const MoqtAnnounceError& message) {
-  if (!CheckIfIsControlStream()) {
-    return;
-  }
   auto it = session_->pending_outgoing_announces_.find(message.track_namespace);
   if (it == session_->pending_outgoing_announces_.end()) {
     session_->Error(MoqtError::kProtocolViolation,
@@ -991,34 +933,23 @@
   session_->pending_outgoing_announces_.erase(it);
 }
 
-void MoqtSession::Stream::OnAnnounceCancelMessage(
+void MoqtSession::ControlStream::OnAnnounceCancelMessage(
     const MoqtAnnounceCancel& message) {
   session_->CancelAnnounce(message.track_namespace);
 }
 
-void MoqtSession::Stream::OnParsingError(MoqtError error_code,
-                                         absl::string_view reason) {
+void MoqtSession::ControlStream::OnParsingError(MoqtError error_code,
+                                                absl::string_view reason) {
   session_->Error(error_code, absl::StrCat("Parse error: ", reason));
 }
 
-bool MoqtSession::Stream::CheckIfIsControlStream() {
-  if (!is_control_stream_.has_value()) {
-    session_->Error(MoqtError::kProtocolViolation,
-                    "Received SUBSCRIBE_REQUEST as first message");
-    return false;
-  }
-  if (!*is_control_stream_) {
-    session_->Error(MoqtError::kProtocolViolation,
-                    "Received SUBSCRIBE_REQUEST on non-control stream");
-    return false;
-  }
-  return true;
-}
-
-void MoqtSession::Stream::SendOrBufferMessage(quiche::QuicheBuffer message,
-                                              bool fin) {
+void MoqtSession::ControlStream::SendOrBufferMessage(
+    quiche::QuicheBuffer message, bool fin) {
   quiche::StreamWriteOptions options;
   options.set_send_fin(fin);
+  // TODO: while we buffer unconditionally, we should still at some point tear
+  // down the connection if we've buffered too many control messages; otherwise,
+  // there is potential for memory exhaustion attacks.
   options.set_buffer_unconditionally(true);
   std::array<absl::string_view, 1> write_vector = {message.AsStringView()};
   absl::Status success = stream_->Writev(absl::MakeSpan(write_vector), options);
@@ -1028,4 +959,56 @@
   }
 }
 
+void MoqtSession::IncomingDataStream::OnObjectMessage(const MoqtObject& message,
+                                                      absl::string_view payload,
+                                                      bool end_of_message) {
+  QUICHE_DVLOG(1)
+      << ENDPOINT << "Received OBJECT message on stream "
+      << stream_->GetStreamId() << " for subscribe_id " << message.subscribe_id
+      << " for track alias " << message.track_alias << " with sequence "
+      << message.group_id << ":" << message.object_id << " send_order "
+      << message.object_send_order << " forwarding_preference "
+      << MoqtForwardingPreferenceToString(message.forwarding_preference)
+      << " length " << payload.size() << " explicit length "
+      << (message.payload_length.has_value() ? (int)*message.payload_length
+                                             : -1)
+      << (end_of_message ? "F" : "");
+  if (!session_->parameters_.deliver_partial_objects) {
+    if (!end_of_message) {  // Buffer partial object.
+      absl::StrAppend(&partial_object_, payload);
+      return;
+    }
+    if (!partial_object_.empty()) {  // Completes the object
+      absl::StrAppend(&partial_object_, payload);
+      payload = absl::string_view(partial_object_);
+    }
+  }
+  auto [full_track_name, visitor] = session_->TrackPropertiesFromAlias(message);
+  if (visitor != nullptr) {
+    visitor->OnObjectFragment(
+        full_track_name, message.group_id, message.object_id,
+        message.object_send_order, message.object_status,
+        message.forwarding_preference, payload, end_of_message);
+  }
+  partial_object_.clear();
+}
+
+void MoqtSession::IncomingDataStream::OnCanRead() {
+  ForwardStreamDataToParser(*stream_, parser_);
+}
+
+void MoqtSession::IncomingDataStream::OnControlMessageReceived() {
+  session_->Error(MoqtError::kProtocolViolation,
+                  "Received a control message on a data stream");
+}
+
+void MoqtSession::IncomingDataStream::OnParsingError(MoqtError error_code,
+                                                     absl::string_view reason) {
+  session_->Error(error_code, absl::StrCat("Parse error: ", reason));
+}
+
+void MoqtSession::OutgoingDataStream::OnCanWrite() {
+  // TODO: handle backpressure on data streams.
+}
+
 }  // namespace moqt
diff --git a/quiche/quic/moqt/moqt_session.h b/quiche/quic/moqt/moqt_session.h
index 9d88da9..b4a0171 100644
--- a/quiche/quic/moqt/moqt_session.h
+++ b/quiche/quic/moqt/moqt_session.h
@@ -149,19 +149,14 @@
 
  private:
   friend class test::MoqtSessionPeer;
-  class QUICHE_EXPORT Stream : public webtransport::StreamVisitor,
-                               public MoqtParserVisitor {
+
+  class QUICHE_EXPORT ControlStream : public webtransport::StreamVisitor,
+                                      public MoqtParserVisitor {
    public:
-    Stream(MoqtSession* session, webtransport::Stream* stream)
+    ControlStream(MoqtSession* session, webtransport::Stream* stream)
         : session_(session),
           stream_(stream),
           parser_(session->parameters_.using_webtrans, *this) {}
-    Stream(MoqtSession* session, webtransport::Stream* stream,
-           bool is_control_stream)
-        : session_(session),
-          stream_(stream),
-          parser_(session->parameters_.using_webtrans, *this),
-          is_control_stream_(is_control_stream) {}
 
     // webtransport::StreamVisitor implementation.
     void OnCanRead() override;
@@ -212,22 +207,123 @@
                             SubscribeErrorCode error_code,
                             absl::string_view reason_phrase,
                             uint64_t track_alias);
-    bool CheckIfIsControlStream();
 
     MoqtSession* session_;
     webtransport::Stream* stream_;
     MoqtParser parser_;
-    // nullopt means "incoming stream, and we don't know if it's the control
-    // stream or a data stream yet".
-    std::optional<bool> is_control_stream_;
+  };
+  class QUICHE_EXPORT IncomingDataStream : public webtransport::StreamVisitor,
+                                           public MoqtParserVisitor {
+   public:
+    IncomingDataStream(MoqtSession* session, webtransport::Stream* stream)
+        : session_(session),
+          stream_(stream),
+          parser_(session->parameters_.using_webtrans, *this) {}
+
+    // webtransport::StreamVisitor implementation.
+    void OnCanRead() override;
+    void OnCanWrite() override {}
+    void OnResetStreamReceived(webtransport::StreamErrorCode error) override {}
+    void OnStopSendingReceived(webtransport::StreamErrorCode error) override {}
+    void OnWriteSideInDataRecvdState() override {}
+
+    // MoqtParserVisitor implementation.
+    // TODO: Handle a stream FIN.
+    void OnObjectMessage(const MoqtObject& message, absl::string_view payload,
+                         bool end_of_message) override;
+    void OnClientSetupMessage(const MoqtClientSetup&) override {
+      OnControlMessageReceived();
+    }
+    void OnServerSetupMessage(const MoqtServerSetup&) override {
+      OnControlMessageReceived();
+    }
+    void OnSubscribeMessage(const MoqtSubscribe&) override {
+      OnControlMessageReceived();
+    }
+    void OnSubscribeOkMessage(const MoqtSubscribeOk&) override {
+      OnControlMessageReceived();
+    }
+    void OnSubscribeErrorMessage(const MoqtSubscribeError&) override {
+      OnControlMessageReceived();
+    }
+    void OnUnsubscribeMessage(const MoqtUnsubscribe&) override {
+      OnControlMessageReceived();
+    }
+    void OnSubscribeDoneMessage(const MoqtSubscribeDone&) override {
+      OnControlMessageReceived();
+    }
+    void OnSubscribeUpdateMessage(const MoqtSubscribeUpdate&) override {
+      OnControlMessageReceived();
+    }
+    void OnAnnounceMessage(const MoqtAnnounce&) override {
+      OnControlMessageReceived();
+    }
+    void OnAnnounceOkMessage(const MoqtAnnounceOk&) override {
+      OnControlMessageReceived();
+    }
+    void OnAnnounceErrorMessage(const MoqtAnnounceError&) override {
+      OnControlMessageReceived();
+    }
+    void OnAnnounceCancelMessage(const MoqtAnnounceCancel& message) override {
+      OnControlMessageReceived();
+    }
+    void OnTrackStatusRequestMessage(
+        const MoqtTrackStatusRequest& message) override {
+      OnControlMessageReceived();
+    }
+    void OnUnannounceMessage(const MoqtUnannounce&) override {
+      OnControlMessageReceived();
+    }
+    void OnTrackStatusMessage(const MoqtTrackStatus&) override {
+      OnControlMessageReceived();
+    }
+    void OnGoAwayMessage(const MoqtGoAway&) override {
+      OnControlMessageReceived();
+    }
+    void OnParsingError(MoqtError error_code,
+                        absl::string_view reason) override;
+
+    quic::Perspective perspective() const {
+      return session_->parameters_.perspective;
+    }
+
+    webtransport::Stream* stream() const { return stream_; }
+
+   private:
+    friend class test::MoqtSessionPeer;
+    void OnControlMessageReceived();
+
+    MoqtSession* session_;
+    webtransport::Stream* stream_;
+    MoqtParser parser_;
     std::string partial_object_;
   };
+  class QUICHE_EXPORT OutgoingDataStream : public webtransport::StreamVisitor {
+   public:
+    OutgoingDataStream(MoqtSession* session, webtransport::Stream* stream)
+        : session_(session), stream_(stream) {}
+
+    // webtransport::StreamVisitor implementation.
+    void OnCanRead() override {}
+    void OnCanWrite() override;
+    void OnResetStreamReceived(webtransport::StreamErrorCode error) override {}
+    void OnStopSendingReceived(webtransport::StreamErrorCode error) override {}
+    void OnWriteSideInDataRecvdState() override {}
+
+    webtransport::Stream* stream() const { return stream_; }
+
+   private:
+    friend class test::MoqtSessionPeer;
+
+    MoqtSession* session_;
+    webtransport::Stream* stream_;
+  };
 
   // Returns true if SUBSCRIBE_DONE was sent.
   bool SubscribeIsDone(uint64_t subscribe_id, SubscribeDoneCode code,
                        absl::string_view reason_phrase);
   // Returns the pointer to the control stream, or nullptr if none is present.
-  Stream* GetControlStream();
+  ControlStream* GetControlStream();
   // Sends a message on the control stream; QUICHE_DCHECKs if no control stream
   // is present.
   void SendControlMessage(quiche::QuicheBuffer message);
diff --git a/quiche/quic/moqt/moqt_session_test.cc b/quiche/quic/moqt/moqt_session_test.cc
index a002462..f8e64e8 100644
--- a/quiche/quic/moqt/moqt_session_test.cc
+++ b/quiche/quic/moqt/moqt_session_test.cc
@@ -66,8 +66,8 @@
  public:
   static std::unique_ptr<MoqtParserVisitor> CreateControlStream(
       MoqtSession* session, webtransport::test::MockStream* stream) {
-    auto new_stream = std::make_unique<MoqtSession::Stream>(
-        session, stream, /*is_control_stream=*/true);
+    auto new_stream =
+        std::make_unique<MoqtSession::ControlStream>(session, stream);
     session->control_stream_ = kControlStreamId;
     EXPECT_CALL(*stream, visitor())
         .Times(AnyNumber())
@@ -75,10 +75,10 @@
     return new_stream;
   }
 
-  static std::unique_ptr<MoqtParserVisitor> CreateUniStream(
+  static std::unique_ptr<MoqtParserVisitor> CreateIncomingDataStream(
       MoqtSession* session, webtransport::Stream* stream) {
-    auto new_stream = std::make_unique<MoqtSession::Stream>(
-        session, stream, /*is_control_stream=*/false);
+    auto new_stream =
+        std::make_unique<MoqtSession::IncomingDataStream>(session, stream);
     return new_stream;
   }
 
@@ -91,7 +91,7 @@
   // stream created by the MoqtSession.
   static MoqtParserVisitor* FetchParserVisitorFromWebtransportStreamVisitor(
       MoqtSession* session, webtransport::StreamVisitor* visitor) {
-    return (MoqtSession::Stream*)visitor;
+    return (MoqtSession::ControlStream*)visitor;
   }
 
   static void CreateRemoteTrack(MoqtSession* session, const FullTrackName& name,
@@ -581,7 +581,7 @@
   };
   StrictMock<webtransport::test::MockStream> mock_stream;
   std::unique_ptr<MoqtParserVisitor> object_stream =
-      MoqtSessionPeer::CreateUniStream(&session_, &mock_stream);
+      MoqtSessionPeer::CreateIncomingDataStream(&session_, &mock_stream);
 
   EXPECT_CALL(visitor_, OnObjectFragment(_, _, _, _, _, _, _, _)).Times(1);
   EXPECT_CALL(mock_stream, GetStreamId())
@@ -606,7 +606,7 @@
   };
   StrictMock<webtransport::test::MockStream> mock_stream;
   std::unique_ptr<MoqtParserVisitor> object_stream =
-      MoqtSessionPeer::CreateUniStream(&session_, &mock_stream);
+      MoqtSessionPeer::CreateIncomingDataStream(&session_, &mock_stream);
 
   EXPECT_CALL(visitor_, OnObjectFragment(_, _, _, _, _, _, _, _)).Times(1);
   EXPECT_CALL(mock_stream, GetStreamId())
@@ -641,7 +641,7 @@
   };
   StrictMock<webtransport::test::MockStream> mock_stream;
   std::unique_ptr<MoqtParserVisitor> object_stream =
-      MoqtSessionPeer::CreateUniStream(&session, &mock_stream);
+      MoqtSessionPeer::CreateIncomingDataStream(&session, &mock_stream);
 
   EXPECT_CALL(visitor_, OnObjectFragment(_, _, _, _, _, _, _, _)).Times(2);
   EXPECT_CALL(mock_stream, GetStreamId())
@@ -677,7 +677,7 @@
   };
   StrictMock<webtransport::test::MockStream> mock_stream;
   std::unique_ptr<MoqtParserVisitor> object_stream =
-      MoqtSessionPeer::CreateUniStream(&session_, &mock_stream);
+      MoqtSessionPeer::CreateIncomingDataStream(&session_, &mock_stream);
 
   EXPECT_CALL(visitor_, OnObjectFragment(_, _, _, _, _, _, _, _))
       .WillOnce([&](const FullTrackName& full_track_name,
@@ -733,7 +733,7 @@
   };
   StrictMock<webtransport::test::MockStream> mock_stream;
   std::unique_ptr<MoqtParserVisitor> object_stream =
-      MoqtSessionPeer::CreateUniStream(&session_, &mock_stream);
+      MoqtSessionPeer::CreateIncomingDataStream(&session_, &mock_stream);
 
   EXPECT_CALL(visitor, OnObjectFragment(_, _, _, _, _, _, _, _))
       .WillOnce([&](const FullTrackName& full_track_name,
@@ -793,7 +793,7 @@
   };
   StrictMock<webtransport::test::MockStream> mock_stream;
   std::unique_ptr<MoqtParserVisitor> object_stream =
-      MoqtSessionPeer::CreateUniStream(&session_, &mock_stream);
+      MoqtSessionPeer::CreateIncomingDataStream(&session_, &mock_stream);
 
   EXPECT_CALL(visitor, OnObjectFragment(_, _, _, _, _, _, _, _))
       .WillOnce([&](const FullTrackName& full_track_name,
@@ -844,7 +844,7 @@
   };
   StrictMock<webtransport::test::MockStream> mock_stream;
   std::unique_ptr<MoqtParserVisitor> object_stream =
-      MoqtSessionPeer::CreateUniStream(&session_, &mock_stream);
+      MoqtSessionPeer::CreateIncomingDataStream(&session_, &mock_stream);
 
   EXPECT_CALL(visitor, OnObjectFragment(_, _, _, _, _, _, _, _))
       .WillOnce([&](const FullTrackName& full_track_name,
@@ -881,7 +881,7 @@
   control_stream->OnSubscribeOkMessage(ok);
 }
 
-TEST_F(MoqtSessionTest, CreateUniStreamAndSend) {
+TEST_F(MoqtSessionTest, CreateIncomingDataStreamAndSend) {
   StrictMock<webtransport::test::MockStream> mock_stream;
   FullTrackName ftn("foo", "bar");
   MockLocalTrackVisitor track_visitor;
@@ -1174,7 +1174,7 @@
   };
   StrictMock<webtransport::test::MockStream> mock_stream;
   std::unique_ptr<MoqtParserVisitor> object_stream =
-      MoqtSessionPeer::CreateUniStream(&session_, &mock_stream);
+      MoqtSessionPeer::CreateIncomingDataStream(&session_, &mock_stream);
 
   EXPECT_CALL(visitor_, OnObjectFragment(_, _, _, _, _, _, _, _)).Times(1);
   EXPECT_CALL(mock_stream, GetStreamId())