diff --git a/quiche/quic/moqt/moqt_session.cc b/quiche/quic/moqt/moqt_session.cc
index 6832f25..023bdcc 100644
--- a/quiche/quic/moqt/moqt_session.cc
+++ b/quiche/quic/moqt/moqt_session.cc
@@ -41,7 +41,7 @@
 
 using ::quic::Perspective;
 
-MoqtSession::Stream* MoqtSession::GetControlStream() {
+MoqtSession::ControlStream* MoqtSession::GetControlStream() {
   if (!control_stream_.has_value()) {
     return nullptr;
   }
@@ -49,11 +49,11 @@
   if (raw_stream == nullptr) {
     return nullptr;
   }
-  return static_cast<Stream*>(raw_stream->visitor());
+  return static_cast<ControlStream*>(raw_stream->visitor());
 }
 
 void MoqtSession::SendControlMessage(quiche::QuicheBuffer message) {
-  Stream* control_stream = GetControlStream();
+  ControlStream* control_stream = GetControlStream();
   if (control_stream == nullptr) {
     QUICHE_LOG(DFATAL) << "Trying to send a message on the control stream "
                           "while it does not exist";
@@ -74,8 +74,8 @@
     Error(MoqtError::kInternalError, "Unable to open a control stream");
     return;
   }
-  control_stream->SetVisitor(std::make_unique<Stream>(
-      this, control_stream, /*is_control_stream=*/true));
+  control_stream->SetVisitor(
+      std::make_unique<ControlStream>(this, control_stream));
   control_stream_ = control_stream->GetStreamId();
   MoqtClientSetup setup = MoqtClientSetup{
       .supported_versions = std::vector<MoqtVersion>{parameters_.version},
@@ -107,14 +107,14 @@
       Error(MoqtError::kProtocolViolation, "Bidirectional stream already open");
       return;
     }
-    stream->SetVisitor(std::make_unique<Stream>(this, stream));
+    stream->SetVisitor(std::make_unique<ControlStream>(this, stream));
     stream->visitor()->OnCanRead();
   }
 }
 void MoqtSession::OnIncomingUnidirectionalStreamAvailable() {
   while (webtransport::Stream* stream =
              session_->AcceptIncomingUnidirectionalStream()) {
-    stream->SetVisitor(std::make_unique<Stream>(this, stream));
+    stream->SetVisitor(std::make_unique<IncomingDataStream>(this, stream));
     stream->visitor()->OnCanRead();
   }
 }
@@ -376,7 +376,8 @@
   if (new_stream == nullptr) {
     return std::nullopt;
   }
-  new_stream->SetVisitor(std::make_unique<Stream>(this, new_stream, false));
+  new_stream->SetVisitor(
+      std::make_unique<OutgoingDataStream>(this, new_stream));
   return new_stream->GetStreamId();
 }
 
@@ -572,82 +573,45 @@
   }
 }
 
-void MoqtSession::Stream::OnCanRead() {
+static void ForwardStreamDataToParser(webtransport::Stream& stream,
+                                      MoqtParser& parser) {
   bool fin =
-      quiche::ProcessAllReadableRegions(*stream_, [&](absl::string_view chunk) {
-        parser_.ProcessData(chunk, /*end_of_stream=*/false);
+      quiche::ProcessAllReadableRegions(stream, [&](absl::string_view chunk) {
+        parser.ProcessData(chunk, /*end_of_stream=*/false);
       });
   if (fin) {
-    parser_.ProcessData("", /*end_of_stream=*/true);
-  }
-}
-void MoqtSession::Stream::OnCanWrite() {}
-void MoqtSession::Stream::OnResetStreamReceived(
-    webtransport::StreamErrorCode error) {
-  if (is_control_stream_.has_value() && *is_control_stream_) {
-    session_->Error(
-        MoqtError::kProtocolViolation,
-        absl::StrCat("Control stream reset with error code ", error));
-  }
-}
-void MoqtSession::Stream::OnStopSendingReceived(
-    webtransport::StreamErrorCode error) {
-  if (is_control_stream_.has_value() && *is_control_stream_) {
-    session_->Error(
-        MoqtError::kProtocolViolation,
-        absl::StrCat("Control stream reset with error code ", error));
+    parser.ProcessData("", /*end_of_stream=*/true);
   }
 }
 
-void MoqtSession::Stream::OnObjectMessage(const MoqtObject& message,
-                                          absl::string_view payload,
-                                          bool end_of_message) {
-  if (is_control_stream_ == true) {
-    session_->Error(MoqtError::kProtocolViolation,
-                    "Received OBJECT message on control stream");
-    return;
-  }
-  QUICHE_DVLOG(1)
-      << ENDPOINT << "Received OBJECT message on stream "
-      << stream_->GetStreamId() << " for subscribe_id " << message.subscribe_id
-      << " for track alias " << message.track_alias << " with sequence "
-      << message.group_id << ":" << message.object_id << " send_order "
-      << message.object_send_order << " forwarding_preference "
-      << MoqtForwardingPreferenceToString(message.forwarding_preference)
-      << " length " << payload.size() << " explicit length "
-      << (message.payload_length.has_value() ? (int)*message.payload_length
-                                             : -1)
-      << (end_of_message ? "F" : "");
-  if (!session_->parameters_.deliver_partial_objects) {
-    if (!end_of_message) {  // Buffer partial object.
-      absl::StrAppend(&partial_object_, payload);
-      return;
-    }
-    if (!partial_object_.empty()) {  // Completes the object
-      absl::StrAppend(&partial_object_, payload);
-      payload = absl::string_view(partial_object_);
-    }
-  }
-  auto [full_track_name, visitor] = session_->TrackPropertiesFromAlias(message);
-  if (visitor != nullptr) {
-    visitor->OnObjectFragment(
-        full_track_name, message.group_id, message.object_id,
-        message.object_send_order, message.object_status,
-        message.forwarding_preference, payload, end_of_message);
-  }
-  partial_object_.clear();
+void MoqtSession::ControlStream::OnCanRead() {
+  ForwardStreamDataToParser(*stream_, parser_);
+}
+void MoqtSession::ControlStream::OnCanWrite() {
+  // We buffer serialized control frames unconditionally, thus OnCanWrite()
+  // requires no handling for control streams.
 }
 
-void MoqtSession::Stream::OnClientSetupMessage(const MoqtClientSetup& message) {
-  if (is_control_stream_.has_value()) {
-    if (!*is_control_stream_) {
-      session_->Error(MoqtError::kProtocolViolation,
-                      "Received SETUP on non-control stream");
-      return;
-    }
-  } else {
-    is_control_stream_ = true;
-  }
+void MoqtSession::ControlStream::OnResetStreamReceived(
+    webtransport::StreamErrorCode error) {
+  session_->Error(MoqtError::kProtocolViolation,
+                  absl::StrCat("Control stream reset with error code ", error));
+}
+void MoqtSession::ControlStream::OnStopSendingReceived(
+    webtransport::StreamErrorCode error) {
+  session_->Error(MoqtError::kProtocolViolation,
+                  absl::StrCat("Control stream reset with error code ", error));
+}
+
+void MoqtSession::ControlStream::OnObjectMessage(const MoqtObject& message,
+                                                 absl::string_view payload,
+                                                 bool end_of_message) {
+  session_->Error(MoqtError::kProtocolViolation,
+                  "Received OBJECT message on control stream");
+}
+
+void MoqtSession::ControlStream::OnClientSetupMessage(
+    const MoqtClientSetup& message) {
   session_->control_stream_ = stream_->GetStreamId();
   if (perspective() == Perspective::IS_CLIENT) {
     session_->Error(MoqtError::kProtocolViolation,
@@ -675,16 +639,8 @@
   session_->peer_role_ = *message.role;
 }
 
-void MoqtSession::Stream::OnServerSetupMessage(const MoqtServerSetup& message) {
-  if (is_control_stream_.has_value()) {
-    if (!*is_control_stream_) {
-      session_->Error(MoqtError::kProtocolViolation,
-                      "Received SETUP on non-control stream");
-      return;
-    }
-  } else {
-    is_control_stream_ = true;
-  }
+void MoqtSession::ControlStream::OnServerSetupMessage(
+    const MoqtServerSetup& message) {
   if (perspective() == Perspective::IS_SERVER) {
     session_->Error(MoqtError::kProtocolViolation,
                     "Received SERVER_SETUP from client");
@@ -703,10 +659,9 @@
   session_->peer_role_ = *message.role;
 }
 
-void MoqtSession::Stream::SendSubscribeError(const MoqtSubscribe& message,
-                                             SubscribeErrorCode error_code,
-                                             absl::string_view reason_phrase,
-                                             uint64_t track_alias) {
+void MoqtSession::ControlStream::SendSubscribeError(
+    const MoqtSubscribe& message, SubscribeErrorCode error_code,
+    absl::string_view reason_phrase, uint64_t track_alias) {
   MoqtSubscribeError subscribe_error;
   subscribe_error.subscribe_id = message.subscribe_id;
   subscribe_error.error_code = error_code;
@@ -716,11 +671,9 @@
       session_->framer_.SerializeSubscribeError(subscribe_error));
 }
 
-void MoqtSession::Stream::OnSubscribeMessage(const MoqtSubscribe& message) {
+void MoqtSession::ControlStream::OnSubscribeMessage(
+    const MoqtSubscribe& message) {
   std::string reason_phrase = "";
-  if (!CheckIfIsControlStream()) {
-    return;
-  }
   if (session_->peer_role_ == MoqtRole::kPublisher) {
     QUIC_DLOG(INFO) << ENDPOINT << "Publisher peer sent SUBSCRIBE";
     session_->Error(MoqtError::kProtocolViolation,
@@ -825,10 +778,8 @@
   }
 }
 
-void MoqtSession::Stream::OnSubscribeOkMessage(const MoqtSubscribeOk& message) {
-  if (!CheckIfIsControlStream()) {
-    return;
-  }
+void MoqtSession::ControlStream::OnSubscribeOkMessage(
+    const MoqtSubscribeOk& message) {
   auto it = session_->active_subscribes_.find(message.subscribe_id);
   if (it == session_->active_subscribes_.end()) {
     session_->Error(MoqtError::kProtocolViolation,
@@ -860,11 +811,8 @@
   session_->active_subscribes_.erase(it);
 }
 
-void MoqtSession::Stream::OnSubscribeErrorMessage(
+void MoqtSession::ControlStream::OnSubscribeErrorMessage(
     const MoqtSubscribeError& message) {
-  if (!CheckIfIsControlStream()) {
-    return;
-  }
   auto it = session_->active_subscribes_.find(message.subscribe_id);
   if (it == session_->active_subscribes_.end()) {
     session_->Error(MoqtError::kProtocolViolation,
@@ -894,12 +842,13 @@
   session_->active_subscribes_.erase(it);
 }
 
-void MoqtSession::Stream::OnUnsubscribeMessage(const MoqtUnsubscribe& message) {
+void MoqtSession::ControlStream::OnUnsubscribeMessage(
+    const MoqtUnsubscribe& message) {
   session_->SubscribeIsDone(message.subscribe_id,
                             SubscribeDoneCode::kUnsubscribed, "");
 }
 
-void MoqtSession::Stream::OnSubscribeUpdateMessage(
+void MoqtSession::ControlStream::OnSubscribeUpdateMessage(
     const MoqtSubscribeUpdate& message) {
   // Search all the tracks to find the subscribe ID.
   auto name_it =
@@ -934,16 +883,14 @@
   }
 }
 
-void MoqtSession::Stream::OnAnnounceMessage(const MoqtAnnounce& message) {
+void MoqtSession::ControlStream::OnAnnounceMessage(
+    const MoqtAnnounce& message) {
   if (session_->peer_role_ == MoqtRole::kSubscriber) {
     QUIC_DLOG(INFO) << ENDPOINT << "Subscriber peer sent SUBSCRIBE";
     session_->Error(MoqtError::kProtocolViolation,
                     "Received ANNOUNCE from Subscriber");
     return;
   }
-  if (!CheckIfIsControlStream()) {
-    return;
-  }
   std::optional<MoqtAnnounceErrorReason> error =
       session_->callbacks_.incoming_announce_callback(message.track_namespace);
   if (error.has_value()) {
@@ -959,10 +906,8 @@
   SendOrBufferMessage(session_->framer_.SerializeAnnounceOk(ok));
 }
 
-void MoqtSession::Stream::OnAnnounceOkMessage(const MoqtAnnounceOk& message) {
-  if (!CheckIfIsControlStream()) {
-    return;
-  }
+void MoqtSession::ControlStream::OnAnnounceOkMessage(
+    const MoqtAnnounceOk& message) {
   auto it = session_->pending_outgoing_announces_.find(message.track_namespace);
   if (it == session_->pending_outgoing_announces_.end()) {
     session_->Error(MoqtError::kProtocolViolation,
@@ -973,11 +918,8 @@
   session_->pending_outgoing_announces_.erase(it);
 }
 
-void MoqtSession::Stream::OnAnnounceErrorMessage(
+void MoqtSession::ControlStream::OnAnnounceErrorMessage(
     const MoqtAnnounceError& message) {
-  if (!CheckIfIsControlStream()) {
-    return;
-  }
   auto it = session_->pending_outgoing_announces_.find(message.track_namespace);
   if (it == session_->pending_outgoing_announces_.end()) {
     session_->Error(MoqtError::kProtocolViolation,
@@ -991,34 +933,23 @@
   session_->pending_outgoing_announces_.erase(it);
 }
 
-void MoqtSession::Stream::OnAnnounceCancelMessage(
+void MoqtSession::ControlStream::OnAnnounceCancelMessage(
     const MoqtAnnounceCancel& message) {
   session_->CancelAnnounce(message.track_namespace);
 }
 
-void MoqtSession::Stream::OnParsingError(MoqtError error_code,
-                                         absl::string_view reason) {
+void MoqtSession::ControlStream::OnParsingError(MoqtError error_code,
+                                                absl::string_view reason) {
   session_->Error(error_code, absl::StrCat("Parse error: ", reason));
 }
 
-bool MoqtSession::Stream::CheckIfIsControlStream() {
-  if (!is_control_stream_.has_value()) {
-    session_->Error(MoqtError::kProtocolViolation,
-                    "Received SUBSCRIBE_REQUEST as first message");
-    return false;
-  }
-  if (!*is_control_stream_) {
-    session_->Error(MoqtError::kProtocolViolation,
-                    "Received SUBSCRIBE_REQUEST on non-control stream");
-    return false;
-  }
-  return true;
-}
-
-void MoqtSession::Stream::SendOrBufferMessage(quiche::QuicheBuffer message,
-                                              bool fin) {
+void MoqtSession::ControlStream::SendOrBufferMessage(
+    quiche::QuicheBuffer message, bool fin) {
   quiche::StreamWriteOptions options;
   options.set_send_fin(fin);
+  // TODO: while we buffer unconditionally, we should still at some point tear
+  // down the connection if we've buffered too many control messages; otherwise,
+  // there is potential for memory exhaustion attacks.
   options.set_buffer_unconditionally(true);
   std::array<absl::string_view, 1> write_vector = {message.AsStringView()};
   absl::Status success = stream_->Writev(absl::MakeSpan(write_vector), options);
@@ -1028,4 +959,56 @@
   }
 }
 
+void MoqtSession::IncomingDataStream::OnObjectMessage(const MoqtObject& message,
+                                                      absl::string_view payload,
+                                                      bool end_of_message) {
+  QUICHE_DVLOG(1)
+      << ENDPOINT << "Received OBJECT message on stream "
+      << stream_->GetStreamId() << " for subscribe_id " << message.subscribe_id
+      << " for track alias " << message.track_alias << " with sequence "
+      << message.group_id << ":" << message.object_id << " send_order "
+      << message.object_send_order << " forwarding_preference "
+      << MoqtForwardingPreferenceToString(message.forwarding_preference)
+      << " length " << payload.size() << " explicit length "
+      << (message.payload_length.has_value() ? (int)*message.payload_length
+                                             : -1)
+      << (end_of_message ? "F" : "");
+  if (!session_->parameters_.deliver_partial_objects) {
+    if (!end_of_message) {  // Buffer partial object.
+      absl::StrAppend(&partial_object_, payload);
+      return;
+    }
+    if (!partial_object_.empty()) {  // Completes the object
+      absl::StrAppend(&partial_object_, payload);
+      payload = absl::string_view(partial_object_);
+    }
+  }
+  auto [full_track_name, visitor] = session_->TrackPropertiesFromAlias(message);
+  if (visitor != nullptr) {
+    visitor->OnObjectFragment(
+        full_track_name, message.group_id, message.object_id,
+        message.object_send_order, message.object_status,
+        message.forwarding_preference, payload, end_of_message);
+  }
+  partial_object_.clear();
+}
+
+void MoqtSession::IncomingDataStream::OnCanRead() {
+  ForwardStreamDataToParser(*stream_, parser_);
+}
+
+void MoqtSession::IncomingDataStream::OnControlMessageReceived() {
+  session_->Error(MoqtError::kProtocolViolation,
+                  "Received a control message on a data stream");
+}
+
+void MoqtSession::IncomingDataStream::OnParsingError(MoqtError error_code,
+                                                     absl::string_view reason) {
+  session_->Error(error_code, absl::StrCat("Parse error: ", reason));
+}
+
+void MoqtSession::OutgoingDataStream::OnCanWrite() {
+  // TODO: handle backpressure on data streams.
+}
+
 }  // namespace moqt
diff --git a/quiche/quic/moqt/moqt_session.h b/quiche/quic/moqt/moqt_session.h
index 9d88da9..b4a0171 100644
--- a/quiche/quic/moqt/moqt_session.h
+++ b/quiche/quic/moqt/moqt_session.h
@@ -149,19 +149,14 @@
 
  private:
   friend class test::MoqtSessionPeer;
-  class QUICHE_EXPORT Stream : public webtransport::StreamVisitor,
-                               public MoqtParserVisitor {
+
+  class QUICHE_EXPORT ControlStream : public webtransport::StreamVisitor,
+                                      public MoqtParserVisitor {
    public:
-    Stream(MoqtSession* session, webtransport::Stream* stream)
+    ControlStream(MoqtSession* session, webtransport::Stream* stream)
         : session_(session),
           stream_(stream),
           parser_(session->parameters_.using_webtrans, *this) {}
-    Stream(MoqtSession* session, webtransport::Stream* stream,
-           bool is_control_stream)
-        : session_(session),
-          stream_(stream),
-          parser_(session->parameters_.using_webtrans, *this),
-          is_control_stream_(is_control_stream) {}
 
     // webtransport::StreamVisitor implementation.
     void OnCanRead() override;
@@ -212,22 +207,123 @@
                             SubscribeErrorCode error_code,
                             absl::string_view reason_phrase,
                             uint64_t track_alias);
-    bool CheckIfIsControlStream();
 
     MoqtSession* session_;
     webtransport::Stream* stream_;
     MoqtParser parser_;
-    // nullopt means "incoming stream, and we don't know if it's the control
-    // stream or a data stream yet".
-    std::optional<bool> is_control_stream_;
+  };
+  class QUICHE_EXPORT IncomingDataStream : public webtransport::StreamVisitor,
+                                           public MoqtParserVisitor {
+   public:
+    IncomingDataStream(MoqtSession* session, webtransport::Stream* stream)
+        : session_(session),
+          stream_(stream),
+          parser_(session->parameters_.using_webtrans, *this) {}
+
+    // webtransport::StreamVisitor implementation.
+    void OnCanRead() override;
+    void OnCanWrite() override {}
+    void OnResetStreamReceived(webtransport::StreamErrorCode error) override {}
+    void OnStopSendingReceived(webtransport::StreamErrorCode error) override {}
+    void OnWriteSideInDataRecvdState() override {}
+
+    // MoqtParserVisitor implementation.
+    // TODO: Handle a stream FIN.
+    void OnObjectMessage(const MoqtObject& message, absl::string_view payload,
+                         bool end_of_message) override;
+    void OnClientSetupMessage(const MoqtClientSetup&) override {
+      OnControlMessageReceived();
+    }
+    void OnServerSetupMessage(const MoqtServerSetup&) override {
+      OnControlMessageReceived();
+    }
+    void OnSubscribeMessage(const MoqtSubscribe&) override {
+      OnControlMessageReceived();
+    }
+    void OnSubscribeOkMessage(const MoqtSubscribeOk&) override {
+      OnControlMessageReceived();
+    }
+    void OnSubscribeErrorMessage(const MoqtSubscribeError&) override {
+      OnControlMessageReceived();
+    }
+    void OnUnsubscribeMessage(const MoqtUnsubscribe&) override {
+      OnControlMessageReceived();
+    }
+    void OnSubscribeDoneMessage(const MoqtSubscribeDone&) override {
+      OnControlMessageReceived();
+    }
+    void OnSubscribeUpdateMessage(const MoqtSubscribeUpdate&) override {
+      OnControlMessageReceived();
+    }
+    void OnAnnounceMessage(const MoqtAnnounce&) override {
+      OnControlMessageReceived();
+    }
+    void OnAnnounceOkMessage(const MoqtAnnounceOk&) override {
+      OnControlMessageReceived();
+    }
+    void OnAnnounceErrorMessage(const MoqtAnnounceError&) override {
+      OnControlMessageReceived();
+    }
+    void OnAnnounceCancelMessage(const MoqtAnnounceCancel& message) override {
+      OnControlMessageReceived();
+    }
+    void OnTrackStatusRequestMessage(
+        const MoqtTrackStatusRequest& message) override {
+      OnControlMessageReceived();
+    }
+    void OnUnannounceMessage(const MoqtUnannounce&) override {
+      OnControlMessageReceived();
+    }
+    void OnTrackStatusMessage(const MoqtTrackStatus&) override {
+      OnControlMessageReceived();
+    }
+    void OnGoAwayMessage(const MoqtGoAway&) override {
+      OnControlMessageReceived();
+    }
+    void OnParsingError(MoqtError error_code,
+                        absl::string_view reason) override;
+
+    quic::Perspective perspective() const {
+      return session_->parameters_.perspective;
+    }
+
+    webtransport::Stream* stream() const { return stream_; }
+
+   private:
+    friend class test::MoqtSessionPeer;
+    void OnControlMessageReceived();
+
+    MoqtSession* session_;
+    webtransport::Stream* stream_;
+    MoqtParser parser_;
     std::string partial_object_;
   };
+  class QUICHE_EXPORT OutgoingDataStream : public webtransport::StreamVisitor {
+   public:
+    OutgoingDataStream(MoqtSession* session, webtransport::Stream* stream)
+        : session_(session), stream_(stream) {}
+
+    // webtransport::StreamVisitor implementation.
+    void OnCanRead() override {}
+    void OnCanWrite() override;
+    void OnResetStreamReceived(webtransport::StreamErrorCode error) override {}
+    void OnStopSendingReceived(webtransport::StreamErrorCode error) override {}
+    void OnWriteSideInDataRecvdState() override {}
+
+    webtransport::Stream* stream() const { return stream_; }
+
+   private:
+    friend class test::MoqtSessionPeer;
+
+    MoqtSession* session_;
+    webtransport::Stream* stream_;
+  };
 
   // Returns true if SUBSCRIBE_DONE was sent.
   bool SubscribeIsDone(uint64_t subscribe_id, SubscribeDoneCode code,
                        absl::string_view reason_phrase);
   // Returns the pointer to the control stream, or nullptr if none is present.
-  Stream* GetControlStream();
+  ControlStream* GetControlStream();
   // Sends a message on the control stream; QUICHE_DCHECKs if no control stream
   // is present.
   void SendControlMessage(quiche::QuicheBuffer message);
diff --git a/quiche/quic/moqt/moqt_session_test.cc b/quiche/quic/moqt/moqt_session_test.cc
index a002462..f8e64e8 100644
--- a/quiche/quic/moqt/moqt_session_test.cc
+++ b/quiche/quic/moqt/moqt_session_test.cc
@@ -66,8 +66,8 @@
  public:
   static std::unique_ptr<MoqtParserVisitor> CreateControlStream(
       MoqtSession* session, webtransport::test::MockStream* stream) {
-    auto new_stream = std::make_unique<MoqtSession::Stream>(
-        session, stream, /*is_control_stream=*/true);
+    auto new_stream =
+        std::make_unique<MoqtSession::ControlStream>(session, stream);
     session->control_stream_ = kControlStreamId;
     EXPECT_CALL(*stream, visitor())
         .Times(AnyNumber())
@@ -75,10 +75,10 @@
     return new_stream;
   }
 
-  static std::unique_ptr<MoqtParserVisitor> CreateUniStream(
+  static std::unique_ptr<MoqtParserVisitor> CreateIncomingDataStream(
       MoqtSession* session, webtransport::Stream* stream) {
-    auto new_stream = std::make_unique<MoqtSession::Stream>(
-        session, stream, /*is_control_stream=*/false);
+    auto new_stream =
+        std::make_unique<MoqtSession::IncomingDataStream>(session, stream);
     return new_stream;
   }
 
@@ -91,7 +91,7 @@
   // stream created by the MoqtSession.
   static MoqtParserVisitor* FetchParserVisitorFromWebtransportStreamVisitor(
       MoqtSession* session, webtransport::StreamVisitor* visitor) {
-    return (MoqtSession::Stream*)visitor;
+    return (MoqtSession::ControlStream*)visitor;
   }
 
   static void CreateRemoteTrack(MoqtSession* session, const FullTrackName& name,
@@ -581,7 +581,7 @@
   };
   StrictMock<webtransport::test::MockStream> mock_stream;
   std::unique_ptr<MoqtParserVisitor> object_stream =
-      MoqtSessionPeer::CreateUniStream(&session_, &mock_stream);
+      MoqtSessionPeer::CreateIncomingDataStream(&session_, &mock_stream);
 
   EXPECT_CALL(visitor_, OnObjectFragment(_, _, _, _, _, _, _, _)).Times(1);
   EXPECT_CALL(mock_stream, GetStreamId())
@@ -606,7 +606,7 @@
   };
   StrictMock<webtransport::test::MockStream> mock_stream;
   std::unique_ptr<MoqtParserVisitor> object_stream =
-      MoqtSessionPeer::CreateUniStream(&session_, &mock_stream);
+      MoqtSessionPeer::CreateIncomingDataStream(&session_, &mock_stream);
 
   EXPECT_CALL(visitor_, OnObjectFragment(_, _, _, _, _, _, _, _)).Times(1);
   EXPECT_CALL(mock_stream, GetStreamId())
@@ -641,7 +641,7 @@
   };
   StrictMock<webtransport::test::MockStream> mock_stream;
   std::unique_ptr<MoqtParserVisitor> object_stream =
-      MoqtSessionPeer::CreateUniStream(&session, &mock_stream);
+      MoqtSessionPeer::CreateIncomingDataStream(&session, &mock_stream);
 
   EXPECT_CALL(visitor_, OnObjectFragment(_, _, _, _, _, _, _, _)).Times(2);
   EXPECT_CALL(mock_stream, GetStreamId())
@@ -677,7 +677,7 @@
   };
   StrictMock<webtransport::test::MockStream> mock_stream;
   std::unique_ptr<MoqtParserVisitor> object_stream =
-      MoqtSessionPeer::CreateUniStream(&session_, &mock_stream);
+      MoqtSessionPeer::CreateIncomingDataStream(&session_, &mock_stream);
 
   EXPECT_CALL(visitor_, OnObjectFragment(_, _, _, _, _, _, _, _))
       .WillOnce([&](const FullTrackName& full_track_name,
@@ -733,7 +733,7 @@
   };
   StrictMock<webtransport::test::MockStream> mock_stream;
   std::unique_ptr<MoqtParserVisitor> object_stream =
-      MoqtSessionPeer::CreateUniStream(&session_, &mock_stream);
+      MoqtSessionPeer::CreateIncomingDataStream(&session_, &mock_stream);
 
   EXPECT_CALL(visitor, OnObjectFragment(_, _, _, _, _, _, _, _))
       .WillOnce([&](const FullTrackName& full_track_name,
@@ -793,7 +793,7 @@
   };
   StrictMock<webtransport::test::MockStream> mock_stream;
   std::unique_ptr<MoqtParserVisitor> object_stream =
-      MoqtSessionPeer::CreateUniStream(&session_, &mock_stream);
+      MoqtSessionPeer::CreateIncomingDataStream(&session_, &mock_stream);
 
   EXPECT_CALL(visitor, OnObjectFragment(_, _, _, _, _, _, _, _))
       .WillOnce([&](const FullTrackName& full_track_name,
@@ -844,7 +844,7 @@
   };
   StrictMock<webtransport::test::MockStream> mock_stream;
   std::unique_ptr<MoqtParserVisitor> object_stream =
-      MoqtSessionPeer::CreateUniStream(&session_, &mock_stream);
+      MoqtSessionPeer::CreateIncomingDataStream(&session_, &mock_stream);
 
   EXPECT_CALL(visitor, OnObjectFragment(_, _, _, _, _, _, _, _))
       .WillOnce([&](const FullTrackName& full_track_name,
@@ -881,7 +881,7 @@
   control_stream->OnSubscribeOkMessage(ok);
 }
 
-TEST_F(MoqtSessionTest, CreateUniStreamAndSend) {
+TEST_F(MoqtSessionTest, CreateIncomingDataStreamAndSend) {
   StrictMock<webtransport::test::MockStream> mock_stream;
   FullTrackName ftn("foo", "bar");
   MockLocalTrackVisitor track_visitor;
@@ -1174,7 +1174,7 @@
   };
   StrictMock<webtransport::test::MockStream> mock_stream;
   std::unique_ptr<MoqtParserVisitor> object_stream =
-      MoqtSessionPeer::CreateUniStream(&session_, &mock_stream);
+      MoqtSessionPeer::CreateIncomingDataStream(&session_, &mock_stream);
 
   EXPECT_CALL(visitor_, OnObjectFragment(_, _, _, _, _, _, _, _)).Times(1);
   EXPECT_CALL(mock_stream, GetStreamId())
