Support the use of MESSAGE/DATAGRAM frames in QBONE.

Received MESSAGE frames are always processed.  By default, sending MESSAGE frames is disabled, but can be enabled via flags.

gfe-relnote: n/a (QBONE-only change)
PiperOrigin-RevId: 267679822
Change-Id: I1263435cc16c17bd6ff8efa82d78b11c761164b2
diff --git a/quic/qbone/qbone_session_base.cc b/quic/qbone/qbone_session_base.cc
index 23b29d2..01917d2 100644
--- a/quic/qbone/qbone_session_base.cc
+++ b/quic/qbone/qbone_session_base.cc
@@ -81,6 +81,11 @@
   QuicSession::OnStreamFrame(frame);
 }
 
+void QboneSessionBase::OnMessageReceived(QuicStringPiece message) {
+  ++num_message_packets_;
+  ProcessPacketFromPeer(message);
+}
+
 QuicStream* QboneSessionBase::CreateIncomingStream(QuicStreamId id) {
   return ActivateDataStream(CreateDataStream(id));
 }
@@ -122,6 +127,23 @@
 }
 
 void QboneSessionBase::SendPacketToPeer(QuicStringPiece packet) {
+  if (crypto_stream_ == nullptr) {
+    QUIC_BUG << "Attempting to send packet before encryption established";
+    return;
+  }
+
+  if (send_packets_as_messages_) {
+    QuicMemSlice slice(connection()->helper()->GetStreamSendBufferAllocator(),
+                       packet.size());
+    memcpy(const_cast<char*>(slice.data()), packet.data(), packet.size());
+    if (SendMessage(QuicMemSliceSpan(&slice)).status ==
+        MESSAGE_STATUS_SUCCESS) {
+      return;
+    }
+    // If SendMessage() fails for any reason, fall back to ephemeral streams.
+    num_fallback_to_stream_++;
+  }
+
   // Qbone streams are ephemeral.
   QuicStream* stream = CreateOutgoingStream();
   if (!stream) {
@@ -142,6 +164,14 @@
   return num_streamed_packets_;
 }
 
+uint64_t QboneSessionBase::GetNumMessagePackets() const {
+  return num_message_packets_;
+}
+
+uint64_t QboneSessionBase::GetNumFallbackToStream() const {
+  return num_fallback_to_stream_;
+}
+
 void QboneSessionBase::set_writer(QbonePacketWriter* writer) {
   writer_ = writer;
   testing::testvalue::Adjust("quic_QbonePacketWriter", &writer_);
diff --git a/quic/qbone/qbone_session_base.h b/quic/qbone/qbone_session_base.h
index 5bfc9be..41afc6b 100644
--- a/quic/qbone/qbone_session_base.h
+++ b/quic/qbone/qbone_session_base.h
@@ -35,6 +35,8 @@
   void CloseStream(QuicStreamId stream_id) override;
   // This will check if the packet is wholly contained.
   void OnStreamFrame(const QuicStreamFrame& frame) override;
+  // Called whenever a MESSAGE frame is received.
+  void OnMessageReceived(QuicStringPiece message) override;
 
   virtual void ProcessPacketFromNetwork(QuicStringPiece packet) = 0;
   virtual void ProcessPacketFromPeer(QuicStringPiece packet) = 0;
@@ -48,7 +50,18 @@
   // multiple packets, requiring the creation of a QboneReadOnlyStream.
   uint64_t GetNumStreamedPackets() const;
 
+  // Returns the number of QBONE network packets that were received using QUIC
+  // MESSAGE frame.
+  uint64_t GetNumMessagePackets() const;
+
+  // Returns the number of times sending a MESSAGE frame failed, and the session
+  // used an ephemeral stream instead.
+  uint64_t GetNumFallbackToStream() const;
+
   void set_writer(QbonePacketWriter* writer);
+  void set_send_packets_as_messages(bool send_packets_as_messages) {
+    send_packets_as_messages_ = send_packets_as_messages;
+  }
 
  protected:
   virtual std::unique_ptr<QuicCryptoStream> CreateCryptoStream() = 0;
@@ -79,12 +92,24 @@
 
   QbonePacketWriter* writer_;
 
+  // If true, MESSAGE frames are used for short datagrams.  If false, ephemeral
+  // streams are used instead.  Note that receiving MESSAGE frames is always
+  // supported.
+  bool send_packets_as_messages_ = false;
+
  private:
   // Used for the crypto handshake.
   std::unique_ptr<QuicCryptoStream> crypto_stream_;
 
+  // Statistics for the packets received by the session.
   uint64_t num_ephemeral_packets_ = 0;
+  uint64_t num_message_packets_ = 0;
   uint64_t num_streamed_packets_ = 0;
+
+  // Number of times the connection has failed to send packets as MESSAGE frame
+  // and used streams as a fallback.
+  uint64_t num_fallback_to_stream_ = 0;
+
   QuicUnorderedSet<QuicStreamId> reliable_streams_;
 };
 
diff --git a/quic/qbone/qbone_session_test.cc b/quic/qbone/qbone_session_test.cc
index 016b2f3..b58f39e 100644
--- a/quic/qbone/qbone_session_test.cc
+++ b/quic/qbone/qbone_session_test.cc
@@ -354,7 +354,7 @@
 
   // Test handshake establishment and sending/receiving of data for two
   // directions.
-  void TestStreamConnection() {
+  void TestStreamConnection(bool use_messages) {
     ASSERT_TRUE(server_peer_->IsCryptoHandshakeConfirmed());
     ASSERT_TRUE(client_peer_->IsCryptoHandshakeConfirmed());
     ASSERT_TRUE(server_peer_->IsEncryptionEstablished());
@@ -406,10 +406,19 @@
     EXPECT_THAT(server_writer_->data(), Contains(TestPacketOut(long_data)));
     EXPECT_THAT(client_peer_->GetNumSentClientHellos(), Eq(2));
     EXPECT_THAT(client_peer_->GetNumReceivedServerConfigUpdates(), Eq(0));
-    EXPECT_THAT(client_peer_->GetNumEphemeralPackets(), Eq(2));
     EXPECT_THAT(client_peer_->GetNumStreamedPackets(), Eq(1));
-    EXPECT_THAT(server_peer_->GetNumEphemeralPackets(), Eq(2));
     EXPECT_THAT(server_peer_->GetNumStreamedPackets(), Eq(1));
+    if (use_messages) {
+      EXPECT_THAT(client_peer_->GetNumEphemeralPackets(), Eq(0));
+      EXPECT_THAT(server_peer_->GetNumEphemeralPackets(), Eq(0));
+      EXPECT_THAT(client_peer_->GetNumMessagePackets(), Eq(2));
+      EXPECT_THAT(server_peer_->GetNumMessagePackets(), Eq(2));
+    } else {
+      EXPECT_THAT(client_peer_->GetNumEphemeralPackets(), Eq(2));
+      EXPECT_THAT(server_peer_->GetNumEphemeralPackets(), Eq(2));
+      EXPECT_THAT(client_peer_->GetNumMessagePackets(), Eq(0));
+      EXPECT_THAT(server_peer_->GetNumMessagePackets(), Eq(0));
+    }
 
     // All streams are ephemeral and should be gone.
     EXPECT_EQ(0u, server_peer_->GetNumActiveStreams());
@@ -449,8 +458,18 @@
 
 TEST_F(QboneSessionTest, StreamConnection) {
   CreateClientAndServerSessions();
+  client_peer_->set_send_packets_as_messages(false);
+  server_peer_->set_send_packets_as_messages(false);
   StartHandshake();
-  TestStreamConnection();
+  TestStreamConnection(false);
+}
+
+TEST_F(QboneSessionTest, Messages) {
+  CreateClientAndServerSessions();
+  client_peer_->set_send_packets_as_messages(true);
+  server_peer_->set_send_packets_as_messages(true);
+  StartHandshake();
+  TestStreamConnection(true);
 }
 
 TEST_F(QboneSessionTest, ClientRejection) {
@@ -481,9 +500,9 @@
 TEST_F(QboneSessionTest, CannotCreateDataStreamBeforeHandshake) {
   CreateClientAndServerSessions();
   EXPECT_QUIC_BUG(client_peer_->ProcessPacketFromNetwork(TestPacketIn("hello")),
-                  "Failed to create an outgoing QBONE stream");
+                  "Attempting to send packet before encryption established");
   EXPECT_QUIC_BUG(server_peer_->ProcessPacketFromNetwork(TestPacketIn("hello")),
-                  "Failed to create an outgoing QBONE stream");
+                  "Attempting to send packet before encryption established");
   EXPECT_EQ(0u, server_peer_->GetNumActiveStreams());
   EXPECT_EQ(0u, client_peer_->GetNumActiveStreams());
 }