Add support such that quic connection can send/receive NEW_CONNECTION_ID &
RETIRE_CONNECTION_ID frames.

Note server does not send NEW_CONNECTION_ID in this change.
Server can receive NEW_CONNECTION_ID and send RETIRE_CONNECTION_ID frames in short header packets but this is a no-op.

Protected by FLAGS_quic_reloadable_flag_quic_connection_support_multiple_cids.

PiperOrigin-RevId: 361561851
Change-Id: Iadf38e2cd28ccb30e2b872a9593800b736abf15a
diff --git a/quic/core/quic_connection.cc b/quic/core/quic_connection.cc
index b64c565..cde42a8 100644
--- a/quic/core/quic_connection.cc
+++ b/quic/core/quic_connection.cc
@@ -385,6 +385,16 @@
   if (use_encryption_level_context_) {
     QUIC_RELOADABLE_FLAG_COUNT(quic_use_encryption_level_context);
   }
+
+  support_multiple_connection_ids_ =
+      version().HasIetfQuicFrames() &&
+      framer_.do_not_synthesize_source_cid_for_short_header() &&
+      GetQuicRestartFlag(quic_use_reference_counted_sesssion_map) &&
+      GetQuicRestartFlag(quic_time_wait_list_support_multiple_cid_v2) &&
+      GetQuicRestartFlag(
+          quic_dispatcher_support_multiple_cid_per_connection_v2) &&
+      GetQuicReloadableFlag(quic_connection_support_multiple_cids);
+
   QUIC_DLOG(INFO) << ENDPOINT << "Created connection with server connection ID "
                   << server_connection_id
                   << " and version: " << ParsedQuicVersionToString(version());
@@ -581,6 +591,16 @@
                        config.max_idle_time_before_crypto_handshake());
   }
 
+  if (support_multiple_connection_ids_ &&
+      config.HasReceivedPreferredAddressConnectionIdAndToken()) {
+    QuicNewConnectionIdFrame frame;
+    std::tie(frame.connection_id, frame.stateless_reset_token) =
+        config.ReceivedPreferredAddressConnectionIdAndToken();
+    frame.sequence_number = 1u;
+    frame.retire_prior_to = 0u;
+    OnNewConnectionIdFrameInner(frame);
+  }
+
   sent_packet_manager_.SetFromConfig(config);
   if (perspective_ == Perspective::IS_SERVER &&
       config.HasClientSentConnectionOption(kAFF2, perspective_)) {
@@ -957,8 +977,7 @@
   QUICHE_DCHECK(!retry_source_connection_id_.has_value())
       << retry_source_connection_id_.value();
   retry_source_connection_id_ = new_connection_id;
-  server_connection_id_ = new_connection_id;
-  packet_creator_.SetServerConnectionId(server_connection_id_);
+  ReplaceInitialServerConnectionId(new_connection_id);
   packet_creator_.SetRetryToken(retry_token);
 
   // Reinstall initial crypters because the connection ID changed.
@@ -1854,6 +1873,28 @@
   return connected_;
 }
 
+bool QuicConnection::OnNewConnectionIdFrameInner(
+    const QuicNewConnectionIdFrame& frame) {
+  QUICHE_DCHECK(support_multiple_connection_ids_);
+  if (peer_issued_cid_manager_ == nullptr) {
+    CloseConnection(
+        IETF_QUIC_PROTOCOL_VIOLATION,
+        "Receives NEW_CONNECTION_ID while peer uses zero length connection ID",
+        ConnectionCloseBehavior::SEND_CONNECTION_CLOSE_PACKET);
+    return false;
+  }
+  std::string error_detail;
+  QuicErrorCode error =
+      peer_issued_cid_manager_->OnNewConnectionIdFrame(frame, &error_detail);
+  if (error != QUIC_NO_ERROR) {
+    CloseConnection(error, error_detail,
+                    ConnectionCloseBehavior::SEND_CONNECTION_CLOSE_PACKET);
+    return false;
+  }
+  QUIC_RELOADABLE_FLAG_COUNT_N(quic_connection_support_multiple_cids, 1, 2);
+  return true;
+}
+
 bool QuicConnection::OnNewConnectionIdFrame(
     const QuicNewConnectionIdFrame& frame) {
   QUIC_BUG_IF(!connected_) << "Processing NEW_CONNECTION_ID frame when "
@@ -1866,7 +1907,10 @@
   if (debug_visitor_ != nullptr) {
     debug_visitor_->OnNewConnectionIdFrame(frame);
   }
-  return true;
+  if (!support_multiple_connection_ids_) {
+    return true;
+  }
+  return OnNewConnectionIdFrameInner(frame);
 }
 
 bool QuicConnection::OnRetireConnectionIdFrame(
@@ -1881,6 +1925,25 @@
   if (debug_visitor_ != nullptr) {
     debug_visitor_->OnRetireConnectionIdFrame(frame);
   }
+  if (!support_multiple_connection_ids_) {
+    return true;
+  }
+  if (self_issued_cid_manager_ == nullptr) {
+    CloseConnection(
+        IETF_QUIC_PROTOCOL_VIOLATION,
+        "Receives RETIRE_CONNECTION_ID while new connection ID is never issued",
+        ConnectionCloseBehavior::SEND_CONNECTION_CLOSE_PACKET);
+    return false;
+  }
+  std::string error_detail;
+  QuicErrorCode error = self_issued_cid_manager_->OnRetireConnectionIdFrame(
+      frame, sent_packet_manager_.GetPtoDelay(), &error_detail);
+  if (error != QUIC_NO_ERROR) {
+    CloseConnection(error, error_detail,
+                    ConnectionCloseBehavior::SEND_CONNECTION_CLOSE_PACKET);
+    return false;
+  }
+  QUIC_RELOADABLE_FLAG_COUNT_N(quic_connection_support_multiple_cids, 2, 2);
   return true;
 }
 
@@ -2751,6 +2814,32 @@
   }
 }
 
+void QuicConnection::ReplaceInitialServerConnectionId(
+    const QuicConnectionId& new_server_connection_id) {
+  QUICHE_DCHECK(perspective_ == Perspective::IS_CLIENT);
+  if (support_multiple_connection_ids_) {
+    if (new_server_connection_id.IsEmpty()) {
+      peer_issued_cid_manager_ = nullptr;
+    } else {
+      if (peer_issued_cid_manager_ != nullptr) {
+        QUIC_BUG_IF(!peer_issued_cid_manager_->IsConnectionIdActive(
+            server_connection_id_))
+            << "Connection ID replaced header is no longer active. old id: "
+            << server_connection_id_ << " new_id: " << new_server_connection_id;
+        peer_issued_cid_manager_->ReplaceConnectionId(server_connection_id_,
+                                                      new_server_connection_id);
+      } else {
+        peer_issued_cid_manager_ =
+            std::make_unique<QuicPeerIssuedConnectionIdManager>(
+                kMinNumOfActiveConnectionIds, new_server_connection_id, clock_,
+                alarm_factory_, this);
+      }
+    }
+  }
+  server_connection_id_ = new_server_connection_id;
+  packet_creator_.SetServerConnectionId(server_connection_id_);
+}
+
 bool QuicConnection::ProcessValidatedPacket(const QuicPacketHeader& header) {
   if (perspective_ == Perspective::IS_SERVER &&
       default_path_.self_address.IsInitialized() &&
@@ -2788,8 +2877,7 @@
     if (!original_destination_connection_id_.has_value()) {
       original_destination_connection_id_ = server_connection_id_;
     }
-    server_connection_id_ = header.source_connection_id;
-    packet_creator_.SetServerConnectionId(server_connection_id_);
+    ReplaceInitialServerConnectionId(header.source_connection_id);
   }
 
   if (!ValidateReceivedPacketNumber(header.packet_number)) {
@@ -3706,6 +3794,27 @@
   }
 }
 
+std::unique_ptr<QuicSelfIssuedConnectionIdManager>
+QuicConnection::MakeSelfIssuedConnectionIdManager() {
+  QUICHE_DCHECK((perspective_ == Perspective::IS_CLIENT &&
+                 !client_connection_id_.IsEmpty()) ||
+                (perspective_ == Perspective::IS_SERVER &&
+                 !server_connection_id_.IsEmpty()));
+  return std::make_unique<QuicSelfIssuedConnectionIdManager>(
+      kMinNumOfActiveConnectionIds,
+      perspective_ == Perspective::IS_CLIENT ? client_connection_id_
+                                             : server_connection_id_,
+      clock_, alarm_factory_, this);
+}
+
+void QuicConnection::MaybeSendConnectionIdToClient() {
+  if (perspective_ == Perspective::IS_CLIENT) {
+    return;
+  }
+  QUICHE_DCHECK(self_issued_cid_manager_ != nullptr);
+  self_issued_cid_manager_->MaybeSendNewConnectionIds();
+}
+
 void QuicConnection::OnHandshakeComplete() {
   sent_packet_manager_.SetHandshakeConfirmed();
   if (send_ack_frequency_on_handshake_completion_ &&
@@ -4300,6 +4409,8 @@
   if (use_path_validator_) {
     CancelPathValidation();
   }
+  peer_issued_cid_manager_.reset();
+  self_issued_cid_manager_.reset();
 }
 
 void QuicConnection::CancelAllAlarms() {
@@ -5813,6 +5924,19 @@
   }
   client_connection_id_ = client_connection_id;
   client_connection_id_is_set_ = true;
+  if (support_multiple_connection_ids_ && !client_connection_id_.IsEmpty()) {
+    if (perspective_ == Perspective::IS_SERVER) {
+      QUICHE_DCHECK(peer_issued_cid_manager_ == nullptr);
+      peer_issued_cid_manager_ =
+          std::make_unique<QuicPeerIssuedConnectionIdManager>(
+              kMinNumOfActiveConnectionIds, client_connection_id_, clock_,
+              alarm_factory_, this);
+    } else {
+      // Note in Chromium client, set_client_connection_id is not called and
+      // thus self_issued_cid_manager_ should be null.
+      self_issued_cid_manager_ = MakeSelfIssuedConnectionIdManager();
+    }
+  }
   QUIC_DLOG(INFO) << ENDPOINT << "setting client connection ID to "
                   << client_connection_id_
                   << " for connection with server connection ID "
@@ -5893,6 +6017,62 @@
                   idle_timeout_connection_close_behavior_);
 }
 
+void QuicConnection::OnPeerIssuedConnectionIdRetired() {
+  QUICHE_DCHECK(peer_issued_cid_manager_ != nullptr);
+  QuicConnectionId* default_path_cid = perspective_ == Perspective::IS_CLIENT
+                                           ? &server_connection_id_
+                                           : &client_connection_id_;
+  if (!default_path_cid->IsEmpty() &&
+      !peer_issued_cid_manager_->IsConnectionIdActive(*default_path_cid)) {
+    *default_path_cid = QuicConnectionId();
+  }
+  if (default_path_cid->IsEmpty()) {
+    // Try setting a new connection ID now such that subsequent
+    // RetireConnectionId frames can be sent on the default path.
+    const QuicConnectionIdData* unused_connection_id_data =
+        peer_issued_cid_manager_->ConsumeOneUnusedConnectionId();
+    if (unused_connection_id_data != nullptr) {
+      *default_path_cid = unused_connection_id_data->connection_id;
+      received_stateless_reset_token_ =
+          unused_connection_id_data->stateless_reset_token;
+      stateless_reset_token_received_ = true;
+      if (perspective_ == Perspective::IS_CLIENT) {
+        packet_creator_.SetServerConnectionId(
+            unused_connection_id_data->connection_id);
+      } else {
+        packet_creator_.SetClientConnectionId(
+            unused_connection_id_data->connection_id);
+      }
+    }
+  }
+
+  std::vector<uint64_t> retired_cid_sequence_numbers =
+      peer_issued_cid_manager_->ConsumeToBeRetiredConnectionIdSequenceNumbers();
+  QUICHE_DCHECK(!retired_cid_sequence_numbers.empty());
+  for (const auto& sequence_number : retired_cid_sequence_numbers) {
+    visitor_->SendRetireConnectionId(sequence_number);
+  }
+}
+
+bool QuicConnection::SendNewConnectionId(
+    const QuicNewConnectionIdFrame& frame) {
+  QUICHE_DCHECK(perspective_ == Perspective::IS_SERVER);
+  visitor_->SendNewConnectionId(frame);
+  return connected_;
+}
+
+void QuicConnection::OnNewConnectionIdIssued(
+    const QuicConnectionId& connection_id) {
+  QUICHE_DCHECK(perspective_ == Perspective::IS_SERVER);
+  visitor_->OnServerConnectionIdIssued(connection_id);
+}
+
+void QuicConnection::OnSelfIssuedConnectionIdRetired(
+    const QuicConnectionId& connection_id) {
+  QUICHE_DCHECK(perspective_ == Perspective::IS_SERVER);
+  visitor_->OnServerConnectionIdRetired(connection_id);
+}
+
 void QuicConnection::MaybeUpdateAckTimeout() {
   if (should_last_packet_instigate_acks_) {
     return;
@@ -6111,7 +6291,28 @@
 
 std::vector<QuicConnectionId> QuicConnection::GetActiveServerConnectionIds()
     const {
-  return {server_connection_id_};
+  if (!support_multiple_connection_ids_ ||
+      self_issued_cid_manager_ == nullptr) {
+    return {server_connection_id_};
+  }
+  return self_issued_cid_manager_->GetUnretiredConnectionIds();
+}
+
+void QuicConnection::CreateConnectionIdManager() {
+  if (!support_multiple_connection_ids_) {
+    return;
+  }
+
+  if (perspective_ == Perspective::IS_CLIENT) {
+    if (!server_connection_id_.IsEmpty()) {
+      peer_issued_cid_manager_ =
+          std::make_unique<QuicPeerIssuedConnectionIdManager>(
+              kMinNumOfActiveConnectionIds, server_connection_id_, clock_,
+              alarm_factory_, this);
+    }
+  } else {
+    self_issued_cid_manager_ = MakeSelfIssuedConnectionIdManager();
+  }
 }
 
 void QuicConnection::SetUnackedMapInitialCapacity() {
diff --git a/quic/core/quic_connection.h b/quic/core/quic_connection.h
index 21e4c68..1e0893f 100644
--- a/quic/core/quic_connection.h
+++ b/quic/core/quic_connection.h
@@ -31,12 +31,14 @@
 #include "quic/core/crypto/transport_parameters.h"
 #include "quic/core/frames/quic_ack_frequency_frame.h"
 #include "quic/core/frames/quic_max_streams_frame.h"
+#include "quic/core/frames/quic_new_connection_id_frame.h"
 #include "quic/core/proto/cached_network_parameters_proto.h"
 #include "quic/core/quic_alarm.h"
 #include "quic/core/quic_alarm_factory.h"
 #include "quic/core/quic_blocked_writer_interface.h"
 #include "quic/core/quic_circular_deque.h"
 #include "quic/core/quic_connection_id.h"
+#include "quic/core/quic_connection_id_manager.h"
 #include "quic/core/quic_connection_stats.h"
 #include "quic/core/quic_constants.h"
 #include "quic/core/quic_framer.h"
@@ -164,6 +166,20 @@
   // Called when an AckFrequency frame need to be sent.
   virtual void SendAckFrequency(const QuicAckFrequencyFrame& frame) = 0;
 
+  // Called to send a NEW_CONNECTION_ID frame.
+  virtual void SendNewConnectionId(const QuicNewConnectionIdFrame& frame) = 0;
+
+  // Called to send a RETIRE_CONNECTION_ID frame.
+  virtual void SendRetireConnectionId(uint64_t sequence_number) = 0;
+
+  // Called when server starts to use a server issued connection ID.
+  virtual void OnServerConnectionIdIssued(
+      const QuicConnectionId& server_connection_id) = 0;
+
+  // Called when server stops to use a server issued connection ID.
+  virtual void OnServerConnectionIdRetired(
+      const QuicConnectionId& server_connection_id) = 0;
+
   // Called to ask if the visitor wants to schedule write resumption as it both
   // has pending data to write, and is able to write (e.g. based on flow control
   // limits).
@@ -440,7 +456,8 @@
       public QuicSentPacketManager::NetworkChangeVisitor,
       public QuicNetworkBlackholeDetector::Delegate,
       public QuicIdleNetworkDetector::Delegate,
-      public QuicPathValidator::SendDelegate {
+      public QuicPathValidator::SendDelegate,
+      public QuicConnectionIdManagerVisitorInterface {
  public:
   // Constructs a new QuicConnection for |connection_id| and
   // |initial_peer_address| using |writer| to write packets. |owns_writer|
@@ -702,10 +719,21 @@
   void OnHandshakeTimeout() override;
   void OnIdleNetworkDetected() override;
 
+  // QuicConnectionIdManagerVisitorInterface
+  void OnPeerIssuedConnectionIdRetired() override;
+  bool SendNewConnectionId(const QuicNewConnectionIdFrame& frame) override;
+  void OnNewConnectionIdIssued(const QuicConnectionId& connection_id) override;
+  void OnSelfIssuedConnectionIdRetired(
+      const QuicConnectionId& connection_id) override;
+
   // Please note, this is not a const function. For logging purpose, please use
   // ack_frame().
   const QuicFrame GetUpdatedAckFrame();
 
+  // Called to send a new connection ID to client if the # of connection ID has
+  // not exceeded the active connection ID limits.
+  void MaybeSendConnectionIdToClient();
+
   // Called when the handshake completes. On the client side, handshake
   // completes on receipt of SHLO. On the server side, handshake completes when
   // SHLO gets ACKed (or a forward secure packet gets decrypted successfully).
@@ -1198,6 +1226,9 @@
 
   bool validate_client_address() const { return validate_client_addresses_; }
 
+  // Instantiates connection ID manager.
+  void CreateConnectionIdManager();
+
  protected:
   // Calls cancel() on all the alarms owned by this connection.
   void CancelAllAlarms();
@@ -1396,6 +1427,11 @@
   void TearDownLocalConnectionState(const QuicConnectionCloseFrame& frame,
                                     ConnectionCloseSource source);
 
+  // Replace server connection ID on the client side from retry packet or
+  // initial packets with a different source connection ID.
+  void ReplaceInitialServerConnectionId(
+      const QuicConnectionId& new_server_connection_id);
+
   // Writes the given packet to socket, encrypted with packet's
   // encryption_level. Returns true on successful write, and false if the writer
   // was blocked and the write needs to be tried again. Notifies the
@@ -1683,6 +1719,13 @@
   // Return true if framer should continue processing the packet.
   bool OnPathChallengeFrameInternal(const QuicPathChallengeFrame& frame);
 
+  virtual std::unique_ptr<QuicSelfIssuedConnectionIdManager>
+  MakeSelfIssuedConnectionIdManager();
+
+  // Process NewConnectionIdFrame either sent from peer or synsthesized from
+  // preferred_address transport parameter.
+  bool OnNewConnectionIdFrameInner(const QuicNewConnectionIdFrame& frame);
+
   QuicFramer framer_;
 
   // Contents received in the current packet, especially used to identify
@@ -1967,6 +2010,9 @@
   // True if the writer supports release timestamp.
   bool supports_release_time_;
 
+  std::unique_ptr<QuicPeerIssuedConnectionIdManager> peer_issued_cid_manager_;
+  std::unique_ptr<QuicSelfIssuedConnectionIdManager> self_issued_cid_manager_;
+
   // Time this connection can release packets into the future.
   QuicTime::Delta release_time_into_future_;
 
@@ -2107,6 +2153,8 @@
 
   // If true, upon seeing a new client address, validate the client address.
   const bool validate_client_addresses_;
+
+  bool support_multiple_connection_ids_ = false;
 };
 
 }  // namespace quic
diff --git a/quic/core/quic_connection_id_manager.h b/quic/core/quic_connection_id_manager.h
index 3f29cd4..0968277 100644
--- a/quic/core/quic_connection_id_manager.h
+++ b/quic/core/quic_connection_id_manager.h
@@ -24,6 +24,10 @@
 
 namespace quic {
 
+namespace test {
+class QuicConnectionIdManagerPeer;
+}  // namespace test
+
 struct QUIC_EXPORT_PRIVATE QuicConnectionIdData {
   QuicConnectionIdData(const QuicConnectionId& connection_id,
                        uint64_t sequence_number,
@@ -83,7 +87,7 @@
                            const QuicConnectionId& new_connection_id);
 
  private:
-  friend class QuicConnectionIdManagerPeer;
+  friend class test::QuicConnectionIdManagerPeer;
 
   bool IsConnectionIdNew(const QuicNewConnectionIdFrame& frame);
 
@@ -134,7 +138,7 @@
       const QuicConnectionId& old_connection_id) const;
 
  private:
-  friend class QuicConnectionIdManagerPeer;
+  friend class test::QuicConnectionIdManagerPeer;
 
   QuicNewConnectionIdFrame IssueNewConnectionId();
 
diff --git a/quic/core/quic_connection_id_manager_test.cc b/quic/core/quic_connection_id_manager_test.cc
index b9ca45f..9c7daf0 100644
--- a/quic/core/quic_connection_id_manager_test.cc
+++ b/quic/core/quic_connection_id_manager_test.cc
@@ -9,27 +9,15 @@
 #include "quic/core/quic_error_codes.h"
 #include "quic/platform/api/quic_test.h"
 #include "quic/test_tools/mock_clock.h"
+#include "quic/test_tools/quic_connection_id_manager_peer.h"
 #include "quic/test_tools/quic_test_utils.h"
 
 namespace quic {
-
-class QuicConnectionIdManagerPeer {
- public:
-  static QuicAlarm* GetRetirePeerIssuedConnectionIdAlarm(
-      QuicPeerIssuedConnectionIdManager* manager) {
-    return manager->retire_connection_id_alarm_.get();
-  }
-
-  static QuicAlarm* GetRetireSelfIssuedConnectionIdAlarm(
-      QuicSelfIssuedConnectionIdManager* manager) {
-    return manager->retire_connection_id_alarm_.get();
-  }
-};
-
 namespace {
 
 using ::quic::test::IsError;
 using ::quic::test::IsQuicNoError;
+using ::quic::test::QuicConnectionIdManagerPeer;
 using ::quic::test::TestConnectionId;
 using ::testing::_;
 using ::testing::ElementsAre;
diff --git a/quic/core/quic_connection_test.cc b/quic/core/quic_connection_test.cc
index ccea95b..a22d5c3 100644
--- a/quic/core/quic_connection_test.cc
+++ b/quic/core/quic_connection_test.cc
@@ -53,6 +53,7 @@
 using testing::AnyNumber;
 using testing::AtLeast;
 using testing::DoAll;
+using testing::ElementsAre;
 using testing::Ge;
 using testing::IgnoreResult;
 using testing::InSequence;
@@ -382,6 +383,7 @@
   // connection and peer creator.
   void set_perspective(Perspective perspective) {
     writer()->set_perspective(perspective);
+    QuicConnectionPeer::ResetPeerIssuedConnectionIdManager(this);
     QuicConnectionPeer::SetPerspective(this, perspective);
     QuicSentPacketManagerPeer::SetPerspective(
         QuicConnectionPeer::GetSentPacketManager(this), perspective);
@@ -473,6 +475,16 @@
         QuicConnectionPeer::GetBlackholeDetectorAlarm(this));
   }
 
+  TestAlarmFactory::TestAlarm* GetRetirePeerIssuedConnectionIdAlarm() {
+    return reinterpret_cast<TestAlarmFactory::TestAlarm*>(
+        QuicConnectionPeer::GetRetirePeerIssuedConnectionIdAlarm(this));
+  }
+
+  TestAlarmFactory::TestAlarm* GetRetireSelfIssuedConnectionIdAlarm() {
+    return reinterpret_cast<TestAlarmFactory::TestAlarm*>(
+        QuicConnectionPeer::GetRetireSelfIssuedConnectionIdAlarm(this));
+  }
+
   void PathDegradingTimeout() {
     QUICHE_DCHECK(PathDegradingDetectionInProgress());
     GetBlackholeDetectorAlarm()->Fire();
@@ -8923,23 +8935,23 @@
     EXPECT_CALL(visitor_,
                 OnPacketReceived(_, _, /*is_connectivity_probe=*/true))
         .Times(1);
-  const QuicSocketAddress kNewSelfAddress =
-      QuicSocketAddress(QuicIpAddress::Loopback6(), /*port=*/23456);
+    const QuicSocketAddress kNewSelfAddress =
+        QuicSocketAddress(QuicIpAddress::Loopback6(), /*port=*/23456);
 
-  std::unique_ptr<SerializedPacket> probing_packet = ConstructProbingPacket();
-  std::unique_ptr<QuicReceivedPacket> received(ConstructReceivedPacket(
-      QuicEncryptedPacket(probing_packet->encrypted_buffer,
-                          probing_packet->encrypted_length),
-      clock_.Now()));
-  uint64_t num_probing_received =
-      connection_.GetStats().num_connectivity_probing_received;
-  ProcessReceivedPacket(kNewSelfAddress, kPeerAddress, *received);
+    std::unique_ptr<SerializedPacket> probing_packet = ConstructProbingPacket();
+    std::unique_ptr<QuicReceivedPacket> received(ConstructReceivedPacket(
+        QuicEncryptedPacket(probing_packet->encrypted_buffer,
+                            probing_packet->encrypted_length),
+        clock_.Now()));
+    uint64_t num_probing_received =
+        connection_.GetStats().num_connectivity_probing_received;
+    ProcessReceivedPacket(kNewSelfAddress, kPeerAddress, *received);
 
-  EXPECT_EQ(num_probing_received + 1,
-            connection_.GetStats().num_connectivity_probing_received);
-  EXPECT_EQ(kPeerAddress, connection_.peer_address());
-  EXPECT_EQ(kPeerAddress, connection_.effective_peer_address());
-  EXPECT_TRUE(connection_.IsPathDegrading());
+    EXPECT_EQ(num_probing_received + 1,
+              connection_.GetStats().num_connectivity_probing_received);
+    EXPECT_EQ(kPeerAddress, connection_.peer_address());
+    EXPECT_EQ(kPeerAddress, connection_.effective_peer_address());
+    EXPECT_TRUE(connection_.IsPathDegrading());
   }
 
   // Verify new path degrading detection is activated.
@@ -13990,6 +14002,221 @@
   EXPECT_TRUE(connection_.HasPendingPathValidation());
 }
 
+TEST_P(QuicConnectionTest,
+       CloseConnectionAfterReceiveNewConnectionIdFromPeerUsingEmptyCID) {
+  if (!version().HasIetfQuicFrames()) {
+    return;
+  }
+  QuicConnectionPeer::EnableMultipleConnectionIdSupport(&connection_);
+  set_perspective(Perspective::IS_SERVER);
+  ASSERT_TRUE(connection_.client_connection_id().IsEmpty());
+
+  EXPECT_CALL(visitor_, BeforeConnectionCloseSent());
+  EXPECT_CALL(visitor_, OnConnectionClosed(_, ConnectionCloseSource::FROM_SELF))
+      .WillOnce(Invoke(this, &QuicConnectionTest::SaveConnectionCloseFrame));
+  QuicNewConnectionIdFrame frame;
+  frame.sequence_number = 1u;
+  frame.connection_id = TestConnectionId(1);
+  frame.stateless_reset_token =
+      QuicUtils::GenerateStatelessResetToken(frame.connection_id);
+  frame.retire_prior_to = 0u;
+
+  EXPECT_FALSE(connection_.OnNewConnectionIdFrame(frame));
+
+  EXPECT_FALSE(connection_.connected());
+  EXPECT_THAT(saved_connection_close_frame_.quic_error_code,
+              IsError(IETF_QUIC_PROTOCOL_VIOLATION));
+}
+
+TEST_P(QuicConnectionTest, NewConnectionIdFrameResultsInError) {
+  if (!version().HasIetfQuicFrames()) {
+    return;
+  }
+  QuicConnectionPeer::EnableMultipleConnectionIdSupport(&connection_);
+  connection_.CreateConnectionIdManager();
+  ASSERT_FALSE(connection_.connection_id().IsEmpty());
+
+  EXPECT_CALL(visitor_, OnConnectionClosed(_, ConnectionCloseSource::FROM_SELF))
+      .WillOnce(Invoke(this, &QuicConnectionTest::SaveConnectionCloseFrame));
+  QuicNewConnectionIdFrame frame;
+  frame.sequence_number = 1u;
+  frame.connection_id = connection_id_;  // Reuses connection ID casuing error.
+  frame.stateless_reset_token =
+      QuicUtils::GenerateStatelessResetToken(frame.connection_id);
+  frame.retire_prior_to = 0u;
+
+  EXPECT_FALSE(connection_.OnNewConnectionIdFrame(frame));
+
+  EXPECT_FALSE(connection_.connected());
+  EXPECT_THAT(saved_connection_close_frame_.quic_error_code,
+              IsError(IETF_QUIC_PROTOCOL_VIOLATION));
+}
+
+TEST_P(QuicConnectionTest,
+       ClientRetirePeerIssuedConnectionIdTriggeredByNewConnectionIdFrame) {
+  if (!version().HasIetfQuicFrames()) {
+    return;
+  }
+  QuicConnectionPeer::EnableMultipleConnectionIdSupport(&connection_);
+  connection_.CreateConnectionIdManager();
+
+  QuicNewConnectionIdFrame frame;
+  frame.sequence_number = 1u;
+  frame.connection_id = TestConnectionId(1);
+  frame.stateless_reset_token =
+      QuicUtils::GenerateStatelessResetToken(frame.connection_id);
+  frame.retire_prior_to = 0u;
+
+  EXPECT_TRUE(connection_.OnNewConnectionIdFrame(frame));
+  auto* retire_peer_issued_cid_alarm =
+      connection_.GetRetirePeerIssuedConnectionIdAlarm();
+  ASSERT_FALSE(retire_peer_issued_cid_alarm->IsSet());
+
+  frame.sequence_number = 2u;
+  frame.connection_id = TestConnectionId(2);
+  frame.stateless_reset_token =
+      QuicUtils::GenerateStatelessResetToken(frame.connection_id);
+  frame.retire_prior_to = 1u;  // CID associated with #1 will be retired.
+
+  EXPECT_TRUE(connection_.OnNewConnectionIdFrame(frame));
+  ASSERT_TRUE(retire_peer_issued_cid_alarm->IsSet());
+  EXPECT_EQ(connection_.connection_id(), connection_id_);
+
+  EXPECT_CALL(visitor_, SendRetireConnectionId(/*sequence_number=*/0u));
+  retire_peer_issued_cid_alarm->Fire();
+  EXPECT_EQ(connection_.connection_id(), TestConnectionId(2));
+  EXPECT_EQ(connection_.packet_creator().GetDestinationConnectionId(),
+            TestConnectionId(2));
+}
+
+TEST_P(QuicConnectionTest,
+       ServerRetirePeerIssuedConnectionIdTriggeredByNewConnectionIdFrame) {
+  if (!version().HasIetfQuicFrames()) {
+    return;
+  }
+  QuicConnectionPeer::EnableMultipleConnectionIdSupport(&connection_);
+  set_perspective(Perspective::IS_SERVER);
+  connection_.set_client_connection_id(TestConnectionId(0));
+
+  QuicNewConnectionIdFrame frame;
+  frame.sequence_number = 1u;
+  frame.connection_id = TestConnectionId(1);
+  frame.stateless_reset_token =
+      QuicUtils::GenerateStatelessResetToken(frame.connection_id);
+  frame.retire_prior_to = 0u;
+
+  EXPECT_TRUE(connection_.OnNewConnectionIdFrame(frame));
+  auto* retire_peer_issued_cid_alarm =
+      connection_.GetRetirePeerIssuedConnectionIdAlarm();
+  ASSERT_FALSE(retire_peer_issued_cid_alarm->IsSet());
+
+  frame.sequence_number = 2u;
+  frame.connection_id = TestConnectionId(2);
+  frame.stateless_reset_token =
+      QuicUtils::GenerateStatelessResetToken(frame.connection_id);
+  frame.retire_prior_to = 1u;  // CID associated with #1 will be retired.
+
+  EXPECT_TRUE(connection_.OnNewConnectionIdFrame(frame));
+  ASSERT_TRUE(retire_peer_issued_cid_alarm->IsSet());
+  EXPECT_EQ(connection_.client_connection_id(), TestConnectionId(0));
+
+  EXPECT_CALL(visitor_, SendRetireConnectionId(/*sequence_number=*/0u));
+  retire_peer_issued_cid_alarm->Fire();
+  EXPECT_EQ(connection_.client_connection_id(), TestConnectionId(2));
+  EXPECT_EQ(connection_.packet_creator().GetDestinationConnectionId(),
+            TestConnectionId(2));
+}
+
+TEST_P(QuicConnectionTest,
+       CloseConnectionAfterReceiveRetireConnectionIdWhenNoCIDIssued) {
+  if (!version().HasIetfQuicFrames()) {
+    return;
+  }
+  QuicConnectionPeer::EnableMultipleConnectionIdSupport(&connection_);
+  set_perspective(Perspective::IS_SERVER);
+
+  EXPECT_CALL(visitor_, BeforeConnectionCloseSent());
+  EXPECT_CALL(visitor_, OnConnectionClosed(_, ConnectionCloseSource::FROM_SELF))
+      .WillOnce(Invoke(this, &QuicConnectionTest::SaveConnectionCloseFrame));
+  QuicRetireConnectionIdFrame frame;
+  frame.sequence_number = 1u;
+
+  EXPECT_FALSE(connection_.OnRetireConnectionIdFrame(frame));
+
+  EXPECT_FALSE(connection_.connected());
+  EXPECT_THAT(saved_connection_close_frame_.quic_error_code,
+              IsError(IETF_QUIC_PROTOCOL_VIOLATION));
+}
+
+TEST_P(QuicConnectionTest, RetireConnectionIdFrameResultsInError) {
+  if (!version().HasIetfQuicFrames()) {
+    return;
+  }
+  QuicConnectionPeer::EnableMultipleConnectionIdSupport(&connection_);
+  set_perspective(Perspective::IS_SERVER);
+  connection_.CreateConnectionIdManager();
+
+  EXPECT_CALL(visitor_, OnServerConnectionIdIssued(_));
+  EXPECT_CALL(visitor_, SendNewConnectionId(_));
+  connection_.MaybeSendConnectionIdToClient();
+
+  EXPECT_CALL(visitor_, BeforeConnectionCloseSent());
+  EXPECT_CALL(visitor_, OnConnectionClosed(_, ConnectionCloseSource::FROM_SELF))
+      .WillOnce(Invoke(this, &QuicConnectionTest::SaveConnectionCloseFrame));
+  QuicRetireConnectionIdFrame frame;
+  frame.sequence_number = 2u;  // The corresponding ID is never issued.
+
+  EXPECT_FALSE(connection_.OnRetireConnectionIdFrame(frame));
+
+  EXPECT_FALSE(connection_.connected());
+  EXPECT_THAT(saved_connection_close_frame_.quic_error_code,
+              IsError(IETF_QUIC_PROTOCOL_VIOLATION));
+}
+
+TEST_P(QuicConnectionTest, ServerRetireSelfIssuedConnectionId) {
+  if (!version().HasIetfQuicFrames()) {
+    return;
+  }
+  QuicConnectionPeer::EnableMultipleConnectionIdSupport(&connection_);
+  set_perspective(Perspective::IS_SERVER);
+  connection_.CreateConnectionIdManager();
+  QuicConnectionId recorded_cid;
+  auto cid_recorder = [&recorded_cid](const QuicConnectionId& cid) {
+    recorded_cid = cid;
+  };
+  QuicConnectionId cid0 = connection_id_;
+  QuicConnectionId cid1;
+  QuicConnectionId cid2;
+
+  EXPECT_CALL(visitor_, OnServerConnectionIdIssued(_))
+      .WillOnce(Invoke(cid_recorder));
+  EXPECT_CALL(visitor_, SendNewConnectionId(_));
+  connection_.MaybeSendConnectionIdToClient();
+  cid1 = recorded_cid;
+
+  auto* retire_self_issued_cid_alarm =
+      connection_.GetRetireSelfIssuedConnectionIdAlarm();
+  ASSERT_FALSE(retire_self_issued_cid_alarm->IsSet());
+
+  QuicRetireConnectionIdFrame frame;
+  frame.sequence_number = 0u;
+  EXPECT_CALL(visitor_, OnServerConnectionIdIssued(_))
+      .WillOnce(Invoke(cid_recorder));
+  // RetireConnectionId trigers sending NewConnectionId immediately.
+  EXPECT_CALL(visitor_, SendNewConnectionId(_));
+  EXPECT_TRUE(connection_.OnRetireConnectionIdFrame(frame));
+  cid2 = recorded_cid;
+  // cid0 is not retired immediately.
+  EXPECT_THAT(connection_.GetActiveServerConnectionIds(),
+              ElementsAre(cid0, cid1, cid2));
+  ASSERT_TRUE(retire_self_issued_cid_alarm->IsSet());
+  // cid0 is retired when the retire CID alarm fires.
+  EXPECT_CALL(visitor_, OnServerConnectionIdRetired(cid0));
+  retire_self_issued_cid_alarm->Fire();
+  EXPECT_THAT(connection_.GetActiveServerConnectionIds(),
+              ElementsAre(cid1, cid2));
+}
+
 }  // namespace
 }  // namespace test
 }  // namespace quic
diff --git a/quic/core/quic_constants.h b/quic/core/quic_constants.h
index fcfec62..b750abb 100644
--- a/quic/core/quic_constants.h
+++ b/quic/core/quic_constants.h
@@ -100,6 +100,9 @@
 // Number of bytes reserved for version number in the packet header.
 const size_t kQuicVersionSize = 4;
 
+// Minimum number of active connection IDs that an end point can maintain.
+const uint32_t kMinNumOfActiveConnectionIds = 2;
+
 // Length of the retry integrity tag in bytes.
 // https://tools.ietf.org/html/draft-ietf-quic-transport-25#section-17.2.5
 const size_t kRetryIntegrityTagLength = 16;
diff --git a/quic/core/quic_flags_list.h b/quic/core/quic_flags_list.h
index 7f4f82d..3627a99 100644
--- a/quic/core/quic_flags_list.h
+++ b/quic/core/quic_flags_list.h
@@ -17,6 +17,7 @@
 QUIC_FLAG(FLAGS_quic_reloadable_flag_quic_can_send_ack_frequency, true)
 QUIC_FLAG(FLAGS_quic_reloadable_flag_quic_close_connection_on_0rtt_packet_number_higher_than_1rtt, true)
 QUIC_FLAG(FLAGS_quic_reloadable_flag_quic_close_connection_with_too_many_outstanding_packets, true)
+QUIC_FLAG(FLAGS_quic_reloadable_flag_quic_connection_support_multiple_cids, false)
 QUIC_FLAG(FLAGS_quic_reloadable_flag_quic_conservative_bursts, false)
 QUIC_FLAG(FLAGS_quic_reloadable_flag_quic_conservative_cwnd_and_pacing_gains, false)
 QUIC_FLAG(FLAGS_quic_reloadable_flag_quic_count_bytes_on_alternative_path_seperately, true)
diff --git a/quic/core/quic_session.cc b/quic/core/quic_session.cc
index fd41059..10b2025 100644
--- a/quic/core/quic_session.cc
+++ b/quic/core/quic_session.cc
@@ -142,6 +142,8 @@
     config_.SetMinAckDelayMs(kDefaultMinAckDelayTimeMs);
   }
 
+  connection_->CreateConnectionIdManager();
+
   // On the server side, version negotiation has been done by the dispatcher,
   // and the server session is created with the right version.
   if (perspective() == Perspective::IS_SERVER) {
@@ -2053,6 +2055,27 @@
   control_frame_manager_.WriteOrBufferAckFrequency(frame);
 }
 
+void QuicSession::SendNewConnectionId(const QuicNewConnectionIdFrame& frame) {
+  control_frame_manager_.WriteOrBufferNewConnectionId(
+      frame.connection_id, frame.sequence_number, frame.retire_prior_to,
+      frame.stateless_reset_token);
+}
+
+void QuicSession::SendRetireConnectionId(uint64_t sequence_number) {
+  control_frame_manager_.WriteOrBufferRetireConnectionId(sequence_number);
+}
+
+void QuicSession::OnServerConnectionIdIssued(
+    const QuicConnectionId& server_connection_id) {
+  visitor_->OnNewConnectionIdSent(connection_->connection_id(),
+                                  server_connection_id);
+}
+
+void QuicSession::OnServerConnectionIdRetired(
+    const QuicConnectionId& server_connection_id) {
+  visitor_->OnConnectionIdRetired(server_connection_id);
+}
+
 bool QuicSession::IsConnectionFlowControlBlocked() const {
   return flow_controller_.IsBlocked();
 }
diff --git a/quic/core/quic_session.h b/quic/core/quic_session.h
index cf7f9f7..f7eef2b 100644
--- a/quic/core/quic_session.h
+++ b/quic/core/quic_session.h
@@ -145,6 +145,12 @@
   // Adds a connection level WINDOW_UPDATE frame.
   void OnAckNeedsRetransmittableFrame() override;
   void SendAckFrequency(const QuicAckFrequencyFrame& frame) override;
+  void SendNewConnectionId(const QuicNewConnectionIdFrame& frame) override;
+  void SendRetireConnectionId(uint64_t sequence_number) override;
+  void OnServerConnectionIdIssued(
+      const QuicConnectionId& server_connection_id) override;
+  void OnServerConnectionIdRetired(
+      const QuicConnectionId& server_connection_id) override;
   bool WillingAndAbleToWrite() const override;
   std::string GetStreamsInfoForLogging() const override;
   void OnPathDegrading() override;
diff --git a/quic/test_tools/quic_connection_id_manager_peer.h b/quic/test_tools/quic_connection_id_manager_peer.h
new file mode 100644
index 0000000..ab749ed
--- /dev/null
+++ b/quic/test_tools/quic_connection_id_manager_peer.h
@@ -0,0 +1,29 @@
+// Copyright (c) 2013 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#ifndef QUICHE_QUIC_TEST_TOOLS_QUIC_CONNECTION_ID_MANAGER_PEER_H_
+#define QUICHE_QUIC_TEST_TOOLS_QUIC_CONNECTION_ID_MANAGER_PEER_H_
+
+#include "quic/core/quic_connection_id_manager.h"
+
+namespace quic {
+namespace test {
+
+class QuicConnectionIdManagerPeer {
+ public:
+  static QuicAlarm* GetRetirePeerIssuedConnectionIdAlarm(
+      QuicPeerIssuedConnectionIdManager* manager) {
+    return manager->retire_connection_id_alarm_.get();
+  }
+
+  static QuicAlarm* GetRetireSelfIssuedConnectionIdAlarm(
+      QuicSelfIssuedConnectionIdManager* manager) {
+    return manager->retire_connection_id_alarm_.get();
+  }
+};
+
+}  // namespace test
+}  // namespace quic
+
+#endif  // QUICHE_QUIC_TEST_TOOLS_QUIC_CONNECTION_ID_MANAGER_PEER_H_
diff --git a/quic/test_tools/quic_connection_peer.cc b/quic/test_tools/quic_connection_peer.cc
index 8d4186e..3f1eaeb 100644
--- a/quic/test_tools/quic_connection_peer.cc
+++ b/quic/test_tools/quic_connection_peer.cc
@@ -10,6 +10,7 @@
 #include "quic/core/quic_received_packet_manager.h"
 #include "quic/platform/api/quic_flags.h"
 #include "quic/platform/api/quic_socket_address.h"
+#include "quic/test_tools/quic_connection_id_manager_peer.h"
 #include "quic/test_tools/quic_framer_peer.h"
 #include "quic/test_tools/quic_sent_packet_manager_peer.h"
 
@@ -164,6 +165,25 @@
 }
 
 // static
+QuicAlarm* QuicConnectionPeer::GetRetirePeerIssuedConnectionIdAlarm(
+    QuicConnection* connection) {
+  if (connection->peer_issued_cid_manager_ == nullptr) {
+    return nullptr;
+  }
+  return QuicConnectionIdManagerPeer::GetRetirePeerIssuedConnectionIdAlarm(
+      connection->peer_issued_cid_manager_.get());
+}
+// static
+QuicAlarm* QuicConnectionPeer::GetRetireSelfIssuedConnectionIdAlarm(
+    QuicConnection* connection) {
+  if (connection->self_issued_cid_manager_ == nullptr) {
+    return nullptr;
+  }
+  return QuicConnectionIdManagerPeer::GetRetireSelfIssuedConnectionIdAlarm(
+      connection->self_issued_cid_manager_.get());
+}
+
+// static
 QuicPacketWriter* QuicConnectionPeer::GetWriter(QuicConnection* connection) {
   return connection->writer_;
 }
@@ -447,5 +467,16 @@
   return connection->default_path_.bytes_received_before_address_validation;
 }
 
+// static
+void QuicConnectionPeer::EnableMultipleConnectionIdSupport(
+    QuicConnection* connection) {
+  connection->support_multiple_connection_ids_ = true;
+}
+
+// static
+void QuicConnectionPeer::ResetPeerIssuedConnectionIdManager(
+    QuicConnection* connection) {
+  connection->peer_issued_cid_manager_ = nullptr;
+}
 }  // namespace test
 }  // namespace quic
diff --git a/quic/test_tools/quic_connection_peer.h b/quic/test_tools/quic_connection_peer.h
index e76b5b4..1de3ae7 100644
--- a/quic/test_tools/quic_connection_peer.h
+++ b/quic/test_tools/quic_connection_peer.h
@@ -8,6 +8,7 @@
 #include <cstddef>
 #include "absl/strings/string_view.h"
 #include "quic/core/quic_connection.h"
+#include "quic/core/quic_connection_id.h"
 #include "quic/core/quic_connection_stats.h"
 #include "quic/core/quic_packets.h"
 #include "quic/core/quic_types.h"
@@ -88,6 +89,10 @@
       QuicConnection* connection);
   static QuicAlarm* GetDiscardZeroRttDecryptionKeysAlarm(
       QuicConnection* connection);
+  static QuicAlarm* GetRetirePeerIssuedConnectionIdAlarm(
+      QuicConnection* connection);
+  static QuicAlarm* GetRetireSelfIssuedConnectionIdAlarm(
+      QuicConnection* connection);
 
   static QuicPacketWriter* GetWriter(QuicConnection* connection);
   // If |owns_writer| is true, takes ownership of |writer|.
@@ -185,6 +190,10 @@
 
   static QuicByteCount BytesReceivedBeforeAddressValidation(
       QuicConnection* connection);
+
+  static void EnableMultipleConnectionIdSupport(QuicConnection* connection);
+
+  static void ResetPeerIssuedConnectionIdManager(QuicConnection* connection);
 };
 
 }  // namespace test
diff --git a/quic/test_tools/quic_test_utils.h b/quic/test_tools/quic_test_utils.h
index 6d5e52b..657d117 100644
--- a/quic/test_tools/quic_test_utils.h
+++ b/quic/test_tools/quic_test_utils.h
@@ -577,6 +577,22 @@
               SendAckFrequency,
               (const QuicAckFrequencyFrame& frame),
               (override));
+  MOCK_METHOD(void,
+              SendNewConnectionId,
+              (const QuicNewConnectionIdFrame& frame),
+              (override));
+  MOCK_METHOD(void,
+              SendRetireConnectionId,
+              (uint64_t sequence_number),
+              (override));
+  MOCK_METHOD(void,
+              OnServerConnectionIdIssued,
+              (const QuicConnectionId& server_connection_id),
+              (override));
+  MOCK_METHOD(void,
+              OnServerConnectionIdRetired,
+              (const QuicConnectionId& server_connection_id),
+              (override));
   MOCK_METHOD(bool, AllowSelfAddressChange, (), (const, override));
   MOCK_METHOD(HandshakeState, GetHandshakeState, (), (const, override));
   MOCK_METHOD(bool,
diff --git a/quic/test_tools/simulator/quic_endpoint.h b/quic/test_tools/simulator/quic_endpoint.h
index 3d81c70..8dab235 100644
--- a/quic/test_tools/simulator/quic_endpoint.h
+++ b/quic/test_tools/simulator/quic_endpoint.h
@@ -81,6 +81,13 @@
   void OnForwardProgressMadeAfterPathDegrading() override {}
   void OnAckNeedsRetransmittableFrame() override {}
   void SendAckFrequency(const QuicAckFrequencyFrame& /*frame*/) override {}
+  void SendNewConnectionId(const QuicNewConnectionIdFrame& /*frame*/) override {
+  }
+  void SendRetireConnectionId(uint64_t /*sequence_number*/) override {}
+  void OnServerConnectionIdIssued(
+      const QuicConnectionId& /*server_connection_id*/) override {}
+  void OnServerConnectionIdRetired(
+      const QuicConnectionId& /*server_connection_id*/) override {}
   bool AllowSelfAddressChange() const override;
   HandshakeState GetHandshakeState() const override;
   bool OnMaxStreamsFrame(const QuicMaxStreamsFrame& /*frame*/) override {