masque_sever_session: unnest different MASQUE implementations

This puts all current and future MASQUE implementations on equal
footing and makes it easier to refactor those implementation into
different functions later.

PiperOrigin-RevId: 530397999
diff --git a/quiche/quic/masque/masque_server_session.cc b/quiche/quic/masque/masque_server_session.cc
index 493b9f6..5d51677 100644
--- a/quiche/quic/masque/masque_server_session.cc
+++ b/quiche/quic/masque/masque_server_session.cc
@@ -323,44 +323,6 @@
     QUIC_DVLOG(1) << "Ignoring OnEvent fd " << fd << " event mask " << events;
     return;
   }
-  auto it = absl::c_find_if(connect_udp_server_states_,
-                            [fd](const ConnectUdpServerState& connect_udp) {
-                              return connect_udp.fd() == fd;
-                            });
-  if (it == connect_udp_server_states_.end()) {
-    auto it2 = absl::c_find_if(connect_ip_server_states_,
-                               [fd](const ConnectIpServerState& connect_ip) {
-                                 return connect_ip.fd() == fd;
-                               });
-    if (it2 == connect_ip_server_states_.end()) {
-      QUIC_BUG(quic_bug_10974_1)
-          << "Got unexpected event mask " << events << " on unknown fd " << fd;
-      return;
-    }
-
-    char datagram[1501];
-    datagram[0] = 0;  // Context ID.
-    while (true) {
-      ssize_t read_size = read(fd, datagram + 1, sizeof(datagram) - 1);
-      if (read_size < 0) {
-        break;
-      }
-      MessageStatus message_status = it2->stream()->SendHttp3Datagram(
-          absl::string_view(datagram, 1 + read_size));
-      QUIC_DVLOG(1) << "Encapsulated IP packet of length " << read_size
-                    << " with stream ID " << it2->stream()->id()
-                    << " and got message status "
-                    << MessageStatusToString(message_status);
-    }
-    if (!event_loop_->SupportsEdgeTriggered()) {
-      if (!event_loop_->RearmSocket(fd, kSocketEventReadable)) {
-        QUIC_BUG(MasqueServerSession_ConnectIp_OnSocketEvent_Rearm)
-            << "Failed to re-arm socket " << fd << " for reading";
-      }
-    }
-
-    return;
-  }
 
   auto rearm = absl::MakeCleanup([&]() {
     if (!event_loop_->SupportsEdgeTriggered()) {
@@ -371,57 +333,97 @@
     }
   });
 
-  QuicSocketAddress expected_target_server_address =
-      it->target_server_address();
-  QUICHE_DCHECK(expected_target_server_address.IsInitialized());
-  QUIC_DVLOG(1) << "Received readable event on fd " << fd << " (mask " << events
-                << ") stream ID " << it->stream()->id() << " server "
-                << expected_target_server_address;
-  QuicUdpSocketApi socket_api;
-  BitMask64 packet_info_interested(QuicUdpPacketInfoBit::PEER_ADDRESS);
-  char packet_buffer[1 + kMaxIncomingPacketSize];
-  packet_buffer[0] = 0;  // context ID.
-  char control_buffer[kDefaultUdpPacketControlBufferSize];
-  while (true) {
-    QuicUdpSocketApi::ReadPacketResult read_result;
-    read_result.packet_buffer = {packet_buffer + 1, sizeof(packet_buffer) - 1};
-    read_result.control_buffer = {control_buffer, sizeof(control_buffer)};
-    socket_api.ReadPacket(fd, packet_info_interested, &read_result);
-    if (!read_result.ok) {
-      // Most likely there is nothing left to read, break out of read loop.
+  do {  // CONNECT-UDP
+    auto it = absl::c_find_if(connect_udp_server_states_,
+                              [fd](const ConnectUdpServerState& connect_udp) {
+                                return connect_udp.fd() == fd;
+                              });
+    if (it == connect_udp_server_states_.end()) {
       break;
     }
-    if (!read_result.packet_info.HasValue(QuicUdpPacketInfoBit::PEER_ADDRESS)) {
-      QUIC_BUG(quic_bug_10974_2)
-          << "Missing peer address when reading from fd " << fd;
-      continue;
+    QuicSocketAddress expected_target_server_address =
+        it->target_server_address();
+    QUICHE_DCHECK(expected_target_server_address.IsInitialized());
+    QUIC_DVLOG(1) << "Received readable event on fd " << fd << " (mask "
+                  << events << ") stream ID " << it->stream()->id()
+                  << " server " << expected_target_server_address;
+    QuicUdpSocketApi socket_api;
+    BitMask64 packet_info_interested(QuicUdpPacketInfoBit::PEER_ADDRESS);
+    char packet_buffer[1 + kMaxIncomingPacketSize];
+    packet_buffer[0] = 0;  // context ID.
+    char control_buffer[kDefaultUdpPacketControlBufferSize];
+    while (true) {
+      QuicUdpSocketApi::ReadPacketResult read_result;
+      read_result.packet_buffer = {packet_buffer + 1,
+                                   sizeof(packet_buffer) - 1};
+      read_result.control_buffer = {control_buffer, sizeof(control_buffer)};
+      socket_api.ReadPacket(fd, packet_info_interested, &read_result);
+      if (!read_result.ok) {
+        // Most likely there is nothing left to read, break out of read loop.
+        break;
+      }
+      if (!read_result.packet_info.HasValue(
+              QuicUdpPacketInfoBit::PEER_ADDRESS)) {
+        QUIC_BUG(quic_bug_10974_2)
+            << "Missing peer address when reading from fd " << fd;
+        continue;
+      }
+      if (read_result.packet_info.peer_address() !=
+          expected_target_server_address) {
+        QUIC_DLOG(ERROR) << "Ignoring UDP packet on fd " << fd
+                         << " from unexpected server address "
+                         << read_result.packet_info.peer_address()
+                         << " (expected " << expected_target_server_address
+                         << ")";
+        continue;
+      }
+      if (!connection()->connected()) {
+        QUIC_BUG(quic_bug_10974_3)
+            << "Unexpected incoming UDP packet on fd " << fd << " from "
+            << expected_target_server_address
+            << " because MASQUE connection is closed";
+        return;
+      }
+      // The packet is valid, send it to the client in a DATAGRAM frame.
+      MessageStatus message_status =
+          it->stream()->SendHttp3Datagram(absl::string_view(
+              packet_buffer, read_result.packet_buffer.buffer_len + 1));
+      QUIC_DVLOG(1) << "Sent UDP packet from " << expected_target_server_address
+                    << " of length " << read_result.packet_buffer.buffer_len
+                    << " with stream ID " << it->stream()->id()
+                    << " and got message status "
+                    << MessageStatusToString(message_status);
     }
-    if (read_result.packet_info.peer_address() !=
-        expected_target_server_address) {
-      QUIC_DLOG(ERROR) << "Ignoring UDP packet on fd " << fd
-                       << " from unexpected server address "
-                       << read_result.packet_info.peer_address()
-                       << " (expected " << expected_target_server_address
-                       << ")";
-      continue;
+  } while (false);
+
+  do {  // CONNECT-IP
+    auto it = absl::c_find_if(connect_ip_server_states_,
+                              [fd](const ConnectIpServerState& connect_ip) {
+                                return connect_ip.fd() == fd;
+                              });
+    if (it == connect_ip_server_states_.end()) {
+      break;
     }
-    if (!connection()->connected()) {
-      QUIC_BUG(quic_bug_10974_3)
-          << "Unexpected incoming UDP packet on fd " << fd << " from "
-          << expected_target_server_address
-          << " because MASQUE connection is closed";
-      return;
+    char datagram[1501];
+    datagram[0] = 0;  // Context ID.
+    while (true) {
+      ssize_t read_size = read(fd, datagram + 1, sizeof(datagram) - 1);
+      if (read_size < 0) {
+        break;
+      }
+      MessageStatus message_status = it->stream()->SendHttp3Datagram(
+          absl::string_view(datagram, 1 + read_size));
+      QUIC_DVLOG(1) << "Encapsulated IP packet of length " << read_size
+                    << " with stream ID " << it->stream()->id()
+                    << " and got message status "
+                    << MessageStatusToString(message_status);
     }
-    // The packet is valid, send it to the client in a DATAGRAM frame.
-    MessageStatus message_status =
-        it->stream()->SendHttp3Datagram(absl::string_view(
-            packet_buffer, read_result.packet_buffer.buffer_len + 1));
-    QUIC_DVLOG(1) << "Sent UDP packet from " << expected_target_server_address
-                  << " of length " << read_result.packet_buffer.buffer_len
-                  << " with stream ID " << it->stream()->id()
-                  << " and got message status "
-                  << MessageStatusToString(message_status);
-  }
+    return;
+  } while (false);
+
+  QUIC_BUG(quic_bug_10974_1)
+      << "Got unexpected event mask " << events << " on unknown fd " << fd;
+  std::move(rearm).Cancel();
 }
 
 bool MasqueServerSession::OnSettingsFrame(const SettingsFrame& frame) {