CONNECT-IP server support

This CL adds server-side support for draft-ietf-masque-connect-ip-03. It also fixes a few bugs found in client code now that we can test it. This was confirmed to work on the Privacy Proxy test VM (cf go/privacy-proxy-vm).

To test, run these commands:
# On CloudTop / workstation
blaze build //third_party/quic/masque:masque_server && gsutil cp blaze-bin/third_party/quic/masque/masque_server gs://pp-toybox-server

# On VM
gsutil cp gs://pp-toybox-server/masque_server . && chmod +x masque_server && ./masque_server --gid= --uid= --logtostderr --v=1

# On CloudTop / workstation
blaze run //third_party/quic/masque:masque_client -- --alsologtostderr --v=1 --disable_certificate_verification --masque_mode=connect-ip testvm.masque.uno:9661 https://test.privateoctopus.com:4433/

PiperOrigin-RevId: 485589201
diff --git a/quiche/quic/masque/masque_client_session.cc b/quiche/quic/masque/masque_client_session.cc
index 8e5bbfa..e9d6d8b 100644
--- a/quiche/quic/masque/masque_client_session.cc
+++ b/quiche/quic/masque/masque_client_session.cc
@@ -39,9 +39,7 @@
                             crypto_config, push_promise_index),
       masque_mode_(masque_mode),
       uri_template_(uri_template),
-      owner_(owner) {
-  connection->SetMaxPacketLength(1400);
-}
+      owner_(owner) {}
 
 void MasqueClientSession::OnMessageAcked(QuicMessageId message_id,
                                          QuicTime /*receive_timestamp*/) {
@@ -218,8 +216,9 @@
   MessageStatus message_status =
       SendHttp3Datagram(connect_ip->stream()->id(), http_payload);
 
-  QUIC_DVLOG(1) << "Sent IP packet with stream ID "
-                << connect_ip->stream()->id() << " and got message status "
+  QUIC_DVLOG(1) << "Sent encapsulated IP packet of length " << packet.size()
+                << " with stream ID " << connect_ip->stream()->id()
+                << " and got message status "
                 << MessageStatusToString(message_status);
 }
 
diff --git a/quiche/quic/masque/masque_encapsulated_client.cc b/quiche/quic/masque/masque_encapsulated_client.cc
index 2d26791..d384363 100644
--- a/quiche/quic/masque/masque_encapsulated_client.cc
+++ b/quiche/quic/masque/masque_encapsulated_client.cc
@@ -29,7 +29,8 @@
     odd_ = !odd_;
   }
   bool IngestData(size_t offset, size_t length) {
-    quiche::QuicheDataReader reader(writer_.data(), writer_.capacity());
+    quiche::QuicheDataReader reader(
+        writer_.data(), std::min<size_t>(offset + length, writer_.capacity()));
     if (!reader.Seek(offset) || reader.BytesRemaining() < length) {
       return false;
     }
@@ -39,7 +40,7 @@
       IngestUInt8(first_byte);
     }
     // Handle each 16-bit word at a time.
-    while (reader.BytesRemaining() > sizeof(uint16_t)) {
+    while (reader.BytesRemaining() >= sizeof(uint16_t)) {
       uint16_t word;
       if (!reader.ReadUInt16(&word)) {
         return false;
@@ -57,6 +58,7 @@
     while (accumulator_ >> 16 > 0) {
       accumulator_ = (accumulator_ & 0xffff) + (accumulator_ >> 16);
     }
+    accumulator_ = 0xffff & ~accumulator_;
     quiche::QuicheDataWriter writer2(writer_.capacity(), writer_.data());
     return writer2.Seek(offset) && writer2.WriteUInt16(accumulator_);
   }
diff --git a/quiche/quic/masque/masque_encapsulated_client_session.cc b/quiche/quic/masque/masque_encapsulated_client_session.cc
index ababe59..6b76417 100644
--- a/quiche/quic/masque/masque_encapsulated_client_session.cc
+++ b/quiche/quic/masque/masque_encapsulated_client_session.cc
@@ -18,9 +18,7 @@
     MasqueClientSession* masque_client_session)
     : QuicSpdyClientSession(config, supported_versions, connection, server_id,
                             crypto_config, push_promise_index),
-      masque_client_session_(masque_client_session) {
-  connection->SetMaxPacketLength(1250);
-}
+      masque_client_session_(masque_client_session) {}
 
 void MasqueEncapsulatedClientSession::ProcessPacket(
     absl::string_view packet, QuicSocketAddress server_address) {
@@ -50,17 +48,20 @@
     QUIC_DLOG(ERROR) << "Dropping empty CONNECT-IP packet";
     return;
   }
-  const uint8_t ip_version = first_byte >> 8;
-  absl::string_view quic_packet;
+  const uint8_t ip_version = first_byte >> 4;
   quiche::QuicheIpAddress server_ip;
   if (ip_version == 6) {
     if (!reader.Seek(5)) {
-      QUICHE_DLOG(ERROR) << "Failed to seek CONNECT-IP IPv6 start";
+      QUICHE_DLOG(ERROR) << "Failed to seek CONNECT-IP IPv6 start"
+                         << "\n"
+                         << quiche::QuicheTextUtils::HexDump(packet);
       return;
     }
     uint8_t next_header = 0;
     if (!reader.ReadUInt8(&next_header)) {
-      QUICHE_DLOG(ERROR) << "Failed to read CONNECT-IP next header";
+      QUICHE_DLOG(ERROR) << "Failed to read CONNECT-IP next header"
+                         << "\n"
+                         << quiche::QuicheTextUtils::HexDump(packet);
       return;
     }
     if (next_header != 17) {
@@ -68,100 +69,140 @@
       // do not expect to see them in practice.
       QUIC_DLOG(ERROR)
           << "Dropping CONNECT-IP packet with unexpected next header "
-          << static_cast<int>(next_header);
+          << static_cast<int>(next_header) << "\n"
+          << quiche::QuicheTextUtils::HexDump(packet);
       return;
     }
     if (!reader.Seek(1)) {
-      QUICHE_DLOG(ERROR) << "Failed to seek CONNECT-IP hop limit";
+      QUICHE_DLOG(ERROR) << "Failed to seek CONNECT-IP hop limit"
+                         << "\n"
+                         << quiche::QuicheTextUtils::HexDump(packet);
       return;
     }
     absl::string_view source_ip;
-    if (!reader.ReadBytes(&source_ip, 16)) {
-      QUICHE_DLOG(ERROR) << "Failed to read CONNECT-IP source IPv6";
+    if (!reader.ReadStringPiece(&source_ip, 16)) {
+      QUICHE_DLOG(ERROR) << "Failed to read CONNECT-IP source IPv6"
+                         << "\n"
+                         << quiche::QuicheTextUtils::HexDump(packet);
       return;
     }
     server_ip.FromPackedString(source_ip.data(), source_ip.length());
     if (!reader.Seek(16)) {
-      QUICHE_DLOG(ERROR) << "Failed to seek CONNECT-IP destination IPv6";
+      QUICHE_DLOG(ERROR) << "Failed to seek CONNECT-IP destination IPv6"
+                         << "\n"
+                         << quiche::QuicheTextUtils::HexDump(packet);
       return;
     }
   } else if (ip_version == 4) {
     uint8_t ihl = first_byte & 0xF;
     if (ihl < 5) {
       QUICHE_DLOG(ERROR) << "Dropping CONNECT-IP packet with invalid IHL "
-                         << static_cast<int>(ihl);
+                         << static_cast<int>(ihl) << "\n"
+                         << quiche::QuicheTextUtils::HexDump(packet);
       return;
     }
     if (!reader.Seek(8)) {
-      QUICHE_DLOG(ERROR) << "Failed to seek CONNECT-IP IPv4 start";
+      QUICHE_DLOG(ERROR) << "Failed to seek CONNECT-IP IPv4 start"
+                         << "\n"
+                         << quiche::QuicheTextUtils::HexDump(packet);
       return;
     }
     uint8_t ip_proto = 0;
     if (!reader.ReadUInt8(&ip_proto)) {
-      QUICHE_DLOG(ERROR) << "Failed to read CONNECT-IP ip_proto";
+      QUICHE_DLOG(ERROR) << "Failed to read CONNECT-IP ip_proto"
+                         << "\n"
+                         << quiche::QuicheTextUtils::HexDump(packet);
       return;
     }
     if (ip_proto != 17) {
       QUIC_DLOG(ERROR) << "Dropping CONNECT-IP packet with unexpected IP proto "
-                       << static_cast<int>(ip_proto);
+                       << static_cast<int>(ip_proto) << "\n"
+                       << quiche::QuicheTextUtils::HexDump(packet);
       return;
     }
     if (!reader.Seek(2)) {
-      QUICHE_DLOG(ERROR) << "Failed to seek CONNECT-IP IP checksum";
+      QUICHE_DLOG(ERROR) << "Failed to seek CONNECT-IP IP checksum"
+                         << "\n"
+                         << quiche::QuicheTextUtils::HexDump(packet);
       return;
     }
     absl::string_view source_ip;
-    if (!reader.ReadBytes(&source_ip, 4)) {
-      QUICHE_DLOG(ERROR) << "Failed to read CONNECT-IP source IPv4";
+    if (!reader.ReadStringPiece(&source_ip, 4)) {
+      QUICHE_DLOG(ERROR) << "Failed to read CONNECT-IP source IPv4"
+                         << "\n"
+                         << quiche::QuicheTextUtils::HexDump(packet);
       return;
     }
     server_ip.FromPackedString(source_ip.data(), source_ip.length());
     if (!reader.Seek(4)) {
-      QUICHE_DLOG(ERROR) << "Failed to seek CONNECT-IP destination IPv4";
+      QUICHE_DLOG(ERROR) << "Failed to seek CONNECT-IP destination IPv4"
+                         << "\n"
+                         << quiche::QuicheTextUtils::HexDump(packet);
       return;
     }
     uint8_t ip_options_length = (ihl - 5) * 4;
     if (!reader.Seek(ip_options_length)) {
       QUICHE_DLOG(ERROR) << "Failed to seek CONNECT-IP IP options of length "
-                         << static_cast<int>(ip_options_length);
+                         << static_cast<int>(ip_options_length) << "\n"
+                         << quiche::QuicheTextUtils::HexDump(packet);
       return;
     }
   } else {
     QUIC_DLOG(ERROR) << "Dropping CONNECT-IP packet with unexpected IP version "
-                     << static_cast<int>(ip_version);
+                     << static_cast<int>(ip_version) << "\n"
+                     << quiche::QuicheTextUtils::HexDump(packet);
     return;
   }
   // Parse UDP header.
   uint16_t server_port;
   if (!reader.ReadUInt16(&server_port)) {
-    QUICHE_DLOG(ERROR) << "Failed to read CONNECT-IP source port";
+    QUICHE_DLOG(ERROR) << "Failed to read CONNECT-IP source port"
+                       << "\n"
+                       << quiche::QuicheTextUtils::HexDump(packet);
     return;
   }
   if (!reader.Seek(2)) {
-    QUICHE_DLOG(ERROR) << "Failed to seek CONNECT-IP destination port";
+    QUICHE_DLOG(ERROR) << "Failed to seek CONNECT-IP destination port"
+                       << "\n"
+                       << quiche::QuicheTextUtils::HexDump(packet);
     return;
   }
   uint16_t udp_length;
   if (!reader.ReadUInt16(&udp_length)) {
-    QUICHE_DLOG(ERROR) << "Failed to read CONNECT-IP UDP length";
+    QUICHE_DLOG(ERROR) << "Failed to read CONNECT-IP UDP length"
+                       << "\n"
+                       << quiche::QuicheTextUtils::HexDump(packet);
+    return;
+  }
+  if (udp_length < 8) {
+    QUICHE_DLOG(ERROR) << "Dropping CONNECT-IP packet with invalid UDP length "
+                       << udp_length << "\n"
+                       << quiche::QuicheTextUtils::HexDump(packet);
     return;
   }
   if (!reader.Seek(2)) {
-    QUICHE_DLOG(ERROR) << "Failed to seek CONNECT-IP UDP checksum";
+    QUICHE_DLOG(ERROR) << "Failed to seek CONNECT-IP UDP checksum"
+                       << "\n"
+                       << quiche::QuicheTextUtils::HexDump(packet);
     return;
   }
-  if (!reader.ReadBytes(&quic_packet, udp_length)) {
-    QUICHE_DLOG(ERROR) << "Failed to read CONNECT-IP UDP payload";
+  absl::string_view quic_packet;
+  if (!reader.ReadStringPiece(&quic_packet, udp_length - 8)) {
+    QUICHE_DLOG(ERROR) << "Failed to read CONNECT-IP UDP payload"
+                       << "\n"
+                       << quiche::QuicheTextUtils::HexDump(packet);
     return;
   }
   if (!reader.IsDoneReading()) {
-    QUICHE_DLOG(INFO)
-        << "Received CONNECT-IP UDP packet with extra data after payload";
+    QUICHE_DLOG(INFO) << "Received CONNECT-IP UDP packet with "
+                      << reader.BytesRemaining()
+                      << " extra bytes after payload\n"
+                      << quiche::QuicheTextUtils::HexDump(packet);
   }
   QUIC_DLOG(INFO) << "Received CONNECT-IP encapsulated packet of length "
-                  << packet.size();
+                  << quic_packet.size();
   QuicTime now = connection()->clock()->ApproximateNow();
-  QuicReceivedPacket received_packet(quic_packet.data(), quic_packet.length(),
+  QuicReceivedPacket received_packet(quic_packet.data(), quic_packet.size(),
                                      now);
   QuicSocketAddress server_address = QuicSocketAddress(server_ip, server_port);
   connection()->ProcessUdpPacket(connection()->self_address(), server_address,
diff --git a/quiche/quic/masque/masque_server_backend.cc b/quiche/quic/masque/masque_server_backend.cc
index b42a055..d7ccb9a 100644
--- a/quiche/quic/masque/masque_server_backend.cc
+++ b/quiche/quic/masque/masque_server_backend.cc
@@ -13,6 +13,12 @@
                                          const std::string& server_authority,
                                          const std::string& cache_directory)
     : masque_mode_(masque_mode), server_authority_(server_authority) {
+  // Start with client IP 10.1.1.2.
+  connect_ip_next_client_ip_[0] = 10;
+  connect_ip_next_client_ip_[1] = 1;
+  connect_ip_next_client_ip_[2] = 1;
+  connect_ip_next_client_ip_[3] = 2;
+
   if (!cache_directory.empty()) {
     QuicMemoryCacheBackend::InitializeBackend(cache_directory);
   }
@@ -30,7 +36,8 @@
   std::string masque_path = "";
   auto protocol_pair = request_headers.find(":protocol");
   if (method != "CONNECT" || protocol_pair == request_headers.end() ||
-      protocol_pair->second != "connect-udp") {
+      (protocol_pair->second != "connect-udp" &&
+       protocol_pair->second != "connect-ip")) {
     // This is not a MASQUE request.
     return false;
   }
@@ -122,4 +129,25 @@
   backend_client_states_.erase(connection_id);
 }
 
+QuicIpAddress MasqueServerBackend::GetNextClientIpAddress() {
+  // Makes sure all addresses are in 10.(1-254).(1-254).(2-254)
+  QuicIpAddress address;
+  address.FromPackedString(
+      reinterpret_cast<char*>(&connect_ip_next_client_ip_[0]),
+      sizeof(connect_ip_next_client_ip_));
+  connect_ip_next_client_ip_[3]++;
+  if (connect_ip_next_client_ip_[3] >= 255) {
+    connect_ip_next_client_ip_[3] = 2;
+    connect_ip_next_client_ip_[2]++;
+    if (connect_ip_next_client_ip_[2] >= 255) {
+      connect_ip_next_client_ip_[2] = 1;
+      connect_ip_next_client_ip_[1]++;
+      if (connect_ip_next_client_ip_[1] >= 255) {
+        QUIC_LOG(FATAL) << "Ran out of IP addresses, restarting process.";
+      }
+    }
+  }
+  return address;
+}
+
 }  // namespace quic
diff --git a/quiche/quic/masque/masque_server_backend.h b/quiche/quic/masque/masque_server_backend.h
index cb4b067..50c5a02 100644
--- a/quiche/quic/masque/masque_server_backend.h
+++ b/quiche/quic/masque/masque_server_backend.h
@@ -53,6 +53,9 @@
   // Unregister backend client.
   void RemoveBackendClient(QuicConnectionId connection_id);
 
+  // Provides a unique client IP address for each CONNECT-IP client.
+  QuicIpAddress GetNextClientIpAddress();
+
  private:
   // Handle MASQUE request.
   bool MaybeHandleMasqueRequest(
@@ -69,6 +72,7 @@
   absl::flat_hash_map<QuicConnectionId, BackendClientState,
                       QuicConnectionIdHash>
       backend_client_states_;
+  uint8_t connect_ip_next_client_ip_[4];
 };
 
 }  // namespace quic
diff --git a/quiche/quic/masque/masque_server_session.cc b/quiche/quic/masque/masque_server_session.cc
index 03fc326..515d9ab 100644
--- a/quiche/quic/masque/masque_server_session.cc
+++ b/quiche/quic/masque/masque_server_session.cc
@@ -4,9 +4,17 @@
 
 #include "quiche/quic/masque/masque_server_session.h"
 
+#include <fcntl.h>
+#include <linux/if.h>
+#include <linux/if_tun.h>
 #include <netdb.h>
+#include <netinet/ip.h>
+#include <netinet/ip_icmp.h>
+#include <netinet/udp.h>
+#include <sys/ioctl.h>
 
 #include <cstddef>
+#include <cstdint>
 #include <limits>
 
 #include "absl/cleanup/cleanup.h"
@@ -18,8 +26,10 @@
 #include "quiche/quic/core/io/quic_event_loop.h"
 #include "quiche/quic/core/quic_data_reader.h"
 #include "quiche/quic/core/quic_udp_socket.h"
+#include "quiche/quic/platform/api/quic_ip_address.h"
 #include "quiche/quic/tools/quic_url.h"
 #include "quiche/common/platform/api/quiche_url_utils.h"
+#include "quiche/common/quiche_ip_address.h"
 
 namespace quic {
 
@@ -75,6 +85,79 @@
   return response;
 }
 
+int CreateTunInterface(const QuicIpAddress& client_address) {
+  if (!client_address.IsIPv4()) {
+    QUIC_LOG(ERROR) << "CreateTunInterface currently only supports IPv4";
+    return -1;
+  }
+  int tun_fd = open("/dev/net/tun", O_RDWR);
+  int ip_fd = -1;
+  do {
+    if (tun_fd < 0) {
+      QUIC_PLOG(ERROR) << "Failed to open clone device";
+      break;
+    }
+    struct ifreq ifr = {};
+    ifr.ifr_flags = IFF_TUN | IFF_NO_PI;
+    // If we want to pick a specific device name, we can set it via
+    // ifr.ifr_name. Otherwise, the kernel will pick the next available tunX
+    // name.
+    int err = ioctl(tun_fd, TUNSETIFF, &ifr);
+    if (err < 0) {
+      QUIC_PLOG(ERROR) << "TUNSETIFF failed";
+      break;
+    }
+    ip_fd = socket(AF_INET, SOCK_DGRAM, 0);
+    if (ip_fd < 0) {
+      QUIC_PLOG(ERROR) << "Failed to open IP configuration socket";
+      break;
+    }
+    struct sockaddr_in addr = {};
+    addr.sin_family = AF_INET;
+    // Local address, unused but needs to be set. We use the same address as the
+    // client address, but with last byte set to 1.
+    addr.sin_addr = client_address.GetIPv4();
+    addr.sin_addr.s_addr &= htonl(0xffffff00);
+    addr.sin_addr.s_addr |= htonl(0x00000001);
+    memcpy(&ifr.ifr_addr, &addr, sizeof(addr));
+    err = ioctl(ip_fd, SIOCSIFADDR, &ifr);
+    if (err < 0) {
+      QUIC_PLOG(ERROR) << "SIOCSIFADDR failed";
+      break;
+    }
+    // Peer address, needs to match source IP address of sent packets.
+    addr.sin_addr = client_address.GetIPv4();
+    memcpy(&ifr.ifr_addr, &addr, sizeof(addr));
+    err = ioctl(ip_fd, SIOCSIFDSTADDR, &ifr);
+    if (err < 0) {
+      QUIC_PLOG(ERROR) << "SIOCSIFDSTADDR failed";
+      break;
+    }
+    err = ioctl(ip_fd, SIOCGIFFLAGS, &ifr);
+    if (err < 0) {
+      QUIC_PLOG(ERROR) << "SIOCGIFFLAGS failed";
+      break;
+    }
+    ifr.ifr_flags |= (IFF_UP | IFF_RUNNING);
+    err = ioctl(ip_fd, SIOCSIFFLAGS, &ifr);
+    if (err < 0) {
+      QUIC_PLOG(ERROR) << "SIOCSIFFLAGS failed";
+      break;
+    }
+    close(ip_fd);
+    QUIC_DLOG(INFO) << "Successfully created TUN interface " << ifr.ifr_name
+                    << " with fd " << tun_fd;
+    return tun_fd;
+  } while (false);
+  if (tun_fd >= 0) {
+    close(tun_fd);
+  }
+  if (ip_fd >= 0) {
+    close(ip_fd);
+  }
+  return -1;
+}
+
 }  // namespace
 
 MasqueServerSession::MasqueServerSession(
@@ -94,7 +177,7 @@
   // Artificially increase the max packet length to 1350 to ensure we can fit
   // QUIC packets inside DATAGRAM frames.
   // TODO(b/181606597) Remove this workaround once we use PMTUD.
-  connection->SetMaxPacketLength(kDefaultMaxPacketSize);
+  connection->SetMaxPacketLength(kMasqueMaxOuterPacketSize);
 
   masque_server_backend_->RegisterBackendClient(connection_id(), this);
   QUICHE_DCHECK_NE(event_loop_, nullptr);
@@ -123,6 +206,10 @@
       [stream_id](const ConnectUdpServerState& connect_udp) {
         return connect_udp.stream()->id() == stream_id;
       });
+  connect_ip_server_states_.remove_if(
+      [stream_id](const ConnectIpServerState& connect_ip) {
+        return connect_ip.stream()->id() == stream_id;
+      });
 
   QuicSimpleServerSession::OnStreamClosed(stream_id);
 }
@@ -171,11 +258,48 @@
     QUIC_DLOG(ERROR) << "MASQUE request with bad method \"" << method << "\"";
     return CreateBackendErrorResponse("400", "Bad method");
   }
-  if (protocol != "connect-udp") {
+  if (protocol != "connect-udp" && protocol != "connect-ip") {
     QUIC_DLOG(ERROR) << "MASQUE request with bad protocol \"" << protocol
                      << "\"";
     return CreateBackendErrorResponse("400", "Bad protocol");
   }
+  if (protocol == "connect-ip") {
+    QuicSpdyStream* stream = static_cast<QuicSpdyStream*>(
+        GetActiveStream(request_handler->stream_id()));
+    if (stream == nullptr) {
+      QUIC_BUG(bad masque server stream type)
+          << "Unexpected stream type for stream ID "
+          << request_handler->stream_id();
+      return CreateBackendErrorResponse("500", "Bad stream type");
+    }
+    QuicIpAddress client_ip = masque_server_backend_->GetNextClientIpAddress();
+    QUIC_DLOG(INFO) << "Using client IP " << client_ip.ToString()
+                    << " for CONNECT-IP stream ID "
+                    << request_handler->stream_id();
+    int fd = CreateTunInterface(client_ip);
+    if (fd < 0) {
+      QUIC_LOG(ERROR) << "Failed to create TUN interface for stream ID "
+                      << request_handler->stream_id();
+      return CreateBackendErrorResponse("500",
+                                        "Failed to create TUN interface");
+    }
+    if (!event_loop_->RegisterSocket(fd, kSocketEventReadable, this)) {
+      QUIC_DLOG(ERROR) << "Failed to register TUN fd with the event loop";
+      close(fd);
+      return CreateBackendErrorResponse("500", "Registering TUN socket failed");
+    }
+    connect_ip_server_states_.push_back(
+        ConnectIpServerState(client_ip, stream, fd, this));
+
+    spdy::Http2HeaderBlock response_headers;
+    response_headers[":status"] = "200";
+    auto response = std::make_unique<QuicBackendResponse>();
+    response->set_response_type(QuicBackendResponse::INCOMPLETE_RESPONSE);
+    response->set_headers(std::move(response_headers));
+    response->set_body("");
+
+    return response;
+  }
   // Extract target host and port from path using default template.
   std::vector<absl::string_view> path_split = absl::StrSplit(path, '/');
   if (path_split.size() != 7 || !path_split[0].empty() ||
@@ -270,8 +394,37 @@
                               return connect_udp.fd() == fd;
                             });
   if (it == connect_udp_server_states_.end()) {
-    QUIC_BUG(quic_bug_10974_1)
-        << "Got unexpected event mask " << events << " on unknown fd " << fd;
+    auto it2 = absl::c_find_if(connect_ip_server_states_,
+                               [fd](const ConnectIpServerState& connect_ip) {
+                                 return connect_ip.fd() == fd;
+                               });
+    if (it2 == connect_ip_server_states_.end()) {
+      QUIC_BUG(quic_bug_10974_1)
+          << "Got unexpected event mask " << events << " on unknown fd " << fd;
+      return;
+    }
+
+    char datagram[1501];
+    datagram[0] = 0;  // Context ID.
+    while (true) {
+      ssize_t read_size = read(fd, datagram + 1, sizeof(datagram) - 1);
+      if (read_size < 0) {
+        break;
+      }
+      MessageStatus message_status = it2->stream()->SendHttp3Datagram(
+          absl::string_view(datagram, 1 + read_size));
+      QUIC_DVLOG(1) << "Encapsulated IP packet of length " << read_size
+                    << " with stream ID " << it2->stream()->id()
+                    << " and got message status "
+                    << MessageStatusToString(message_status);
+    }
+    if (!event_loop_->SupportsEdgeTriggered()) {
+      if (!event_loop_->RearmSocket(fd, kSocketEventReadable)) {
+        QUIC_BUG(MasqueServerSession_ConnectIp_OnSocketEvent_Rearm)
+            << "Failed to re-arm socket " << fd << " for reading";
+      }
+    }
+
     return;
   }
 
@@ -430,4 +583,123 @@
                 << target_server_address_ << " with result " << write_result;
 }
 
+MasqueServerSession::ConnectIpServerState::ConnectIpServerState(
+    QuicIpAddress client_ip, QuicSpdyStream* stream, QuicUdpSocketFd fd,
+    MasqueServerSession* masque_session)
+    : client_ip_(client_ip),
+      stream_(stream),
+      fd_(fd),
+      masque_session_(masque_session) {
+  QUICHE_DCHECK(client_ip_.IsIPv4());
+  QUICHE_DCHECK_NE(fd_, kQuicInvalidSocketFd);
+  QUICHE_DCHECK_NE(masque_session_, nullptr);
+  this->stream()->RegisterHttp3DatagramVisitor(this);
+  this->stream()->RegisterConnectIpVisitor(this);
+}
+
+MasqueServerSession::ConnectIpServerState::~ConnectIpServerState() {
+  if (stream() != nullptr) {
+    stream()->UnregisterHttp3DatagramVisitor();
+    stream()->UnregisterConnectIpVisitor();
+  }
+  if (fd_ == kQuicInvalidSocketFd) {
+    return;
+  }
+  QuicUdpSocketApi socket_api;
+  QUIC_DLOG(INFO) << "Closing fd " << fd_;
+  if (!masque_session_->event_loop()->UnregisterSocket(fd_)) {
+    QUIC_DLOG(ERROR) << "Failed to unregister FD " << fd_;
+  }
+  socket_api.Destroy(fd_);
+}
+
+MasqueServerSession::ConnectIpServerState::ConnectIpServerState(
+    MasqueServerSession::ConnectIpServerState&& other) {
+  fd_ = kQuicInvalidSocketFd;
+  *this = std::move(other);
+}
+
+MasqueServerSession::ConnectIpServerState&
+MasqueServerSession::ConnectIpServerState::operator=(
+    MasqueServerSession::ConnectIpServerState&& other) {
+  if (fd_ != kQuicInvalidSocketFd) {
+    QuicUdpSocketApi socket_api;
+    QUIC_DLOG(INFO) << "Closing fd " << fd_;
+    if (!masque_session_->event_loop()->UnregisterSocket(fd_)) {
+      QUIC_DLOG(ERROR) << "Failed to unregister FD " << fd_;
+    }
+    socket_api.Destroy(fd_);
+  }
+  client_ip_ = other.client_ip_;
+  stream_ = other.stream_;
+  other.stream_ = nullptr;
+  fd_ = other.fd_;
+  masque_session_ = other.masque_session_;
+  other.fd_ = kQuicInvalidSocketFd;
+  if (stream() != nullptr) {
+    stream()->ReplaceHttp3DatagramVisitor(this);
+    stream()->ReplaceConnectIpVisitor(this);
+  }
+  return *this;
+}
+
+void MasqueServerSession::ConnectIpServerState::OnHttp3Datagram(
+    QuicStreamId stream_id, absl::string_view payload) {
+  QUICHE_DCHECK_EQ(stream_id, stream()->id());
+  QuicDataReader reader(payload);
+  uint64_t context_id;
+  if (!reader.ReadVarInt62(&context_id)) {
+    QUIC_DLOG(ERROR) << "Failed to read context ID";
+    return;
+  }
+  if (context_id != 0) {
+    QUIC_DLOG(ERROR) << "Ignoring HTTP Datagram with unexpected context ID "
+                     << context_id;
+    return;
+  }
+  absl::string_view ip_packet = reader.ReadRemainingPayload();
+  ssize_t written = write(fd(), ip_packet.data(), ip_packet.size());
+  if (written != static_cast<ssize_t>(ip_packet.size())) {
+    QUIC_DLOG(ERROR) << "Failed to write CONNECT-IP packet of length "
+                     << ip_packet.size();
+  } else {
+    QUIC_DLOG(INFO) << "Decapsulated CONNECT-IP packet of length "
+                    << ip_packet.size();
+  }
+}
+
+bool MasqueServerSession::ConnectIpServerState::OnAddressAssignCapsule(
+    const AddressAssignCapsule& /*capsule*/) {
+  return true;
+}
+
+bool MasqueServerSession::ConnectIpServerState::OnAddressRequestCapsule(
+    const AddressRequestCapsule& /*capsule*/) {
+  return true;
+}
+
+bool MasqueServerSession::ConnectIpServerState::OnRouteAdvertisementCapsule(
+    const RouteAdvertisementCapsule& /*capsule*/) {
+  return true;
+}
+
+void MasqueServerSession::ConnectIpServerState::OnHeadersWritten() {
+  QUICHE_DCHECK(client_ip_.IsIPv4()) << client_ip_.ToString();
+  Capsule address_assign_capsule = Capsule::AddressAssign();
+  PrefixWithId assigned_address;
+  assigned_address.ip_prefix = quiche::QuicheIpPrefix(client_ip_, 32);
+  assigned_address.request_id = 0;
+  address_assign_capsule.address_assign_capsule().assigned_addresses.push_back(
+      assigned_address);
+  stream()->WriteCapsule(address_assign_capsule);
+  IpAddressRange default_route;
+  default_route.start_ip_address.FromString("0.0.0.0");
+  default_route.end_ip_address.FromString("255.255.255.255");
+  default_route.ip_protocol = 0;
+  Capsule route_advertisement = Capsule::RouteAdvertisement();
+  route_advertisement.route_advertisement_capsule().ip_address_ranges.push_back(
+      default_route);
+  stream()->WriteCapsule(route_advertisement);
+}
+
 }  // namespace quic
diff --git a/quiche/quic/masque/masque_server_session.h b/quiche/quic/masque/masque_server_session.h
index f26ef4f..77fdc73 100644
--- a/quiche/quic/masque/masque_server_session.h
+++ b/quiche/quic/masque/masque_server_session.h
@@ -90,6 +90,46 @@
     MasqueServerSession* masque_session_;  // Unowned.
   };
 
+  // State that the MasqueServerSession keeps for each CONNECT-IP request.
+  class QUIC_NO_EXPORT ConnectIpServerState
+      : public QuicSpdyStream::Http3DatagramVisitor,
+        public QuicSpdyStream::ConnectIpVisitor {
+   public:
+    // ConnectIpServerState takes ownership of |fd|. It will unregister it
+    // from |event_loop| and close the file descriptor when destructed.
+    explicit ConnectIpServerState(QuicIpAddress client_ip,
+                                  QuicSpdyStream* stream, QuicUdpSocketFd fd,
+                                  MasqueServerSession* masque_session);
+
+    ~ConnectIpServerState();
+
+    // Disallow copy but allow move.
+    ConnectIpServerState(const ConnectIpServerState&) = delete;
+    ConnectIpServerState(ConnectIpServerState&&);
+    ConnectIpServerState& operator=(const ConnectIpServerState&) = delete;
+    ConnectIpServerState& operator=(ConnectIpServerState&&);
+
+    QuicSpdyStream* stream() const { return stream_; }
+    QuicUdpSocketFd fd() const { return fd_; }
+
+    // From QuicSpdyStream::Http3DatagramVisitor.
+    void OnHttp3Datagram(QuicStreamId stream_id,
+                         absl::string_view payload) override;
+
+    // From QuicSpdyStream::ConnectIpVisitor.
+    bool OnAddressAssignCapsule(const AddressAssignCapsule& capsule) override;
+    bool OnAddressRequestCapsule(const AddressRequestCapsule& capsule) override;
+    bool OnRouteAdvertisementCapsule(
+        const RouteAdvertisementCapsule& capsule) override;
+    void OnHeadersWritten() override;
+
+   private:
+    QuicIpAddress client_ip_;
+    QuicSpdyStream* stream_;
+    QuicUdpSocketFd fd_;                   // Owned.
+    MasqueServerSession* masque_session_;  // Unowned.
+  };
+
   // From QuicSpdySession.
   bool OnSettingsFrame(const SettingsFrame& frame) override;
   HttpDatagramSupport LocalHttpDatagramSupport() override {
@@ -100,6 +140,7 @@
   QuicEventLoop* event_loop_;                   // Unowned.
   MasqueMode masque_mode_;
   std::list<ConnectUdpServerState> connect_udp_server_states_;
+  std::list<ConnectIpServerState> connect_ip_server_states_;
   bool masque_initialized_ = false;
 };
 
diff --git a/quiche/quic/masque/masque_utils.h b/quiche/quic/masque/masque_utils.h
index e55b808..09dbe63 100644
--- a/quiche/quic/masque/masque_utils.h
+++ b/quiche/quic/masque/masque_utils.h
@@ -19,7 +19,7 @@
 
 // Maximum packet size for encapsulated connections.
 enum : QuicByteCount {
-  kMasqueMaxEncapsulatedPacketSize = 1300,
+  kMasqueMaxEncapsulatedPacketSize = 1250,
   kMasqueMaxOuterPacketSize = 1350,
 };