Fix packet send time with udp gso packet writer, when the packet is bursted. protected by existing --gfe2_restart_flag_quic_support_release_time_for_gso.

PiperOrigin-RevId: 319112843
Change-Id: I80f2cb98edb4f8d50f357992025691f8f2e26d9b
diff --git a/quic/core/batch_writer/quic_batch_writer_base.cc b/quic/core/batch_writer/quic_batch_writer_base.cc
index 919f76b..46e230c 100644
--- a/quic/core/batch_writer/quic_batch_writer_base.cc
+++ b/quic/core/batch_writer/quic_batch_writer_base.cc
@@ -8,6 +8,7 @@
 #include "net/third_party/quiche/src/quic/platform/api/quic_export.h"
 #include "net/third_party/quiche/src/quic/platform/api/quic_flags.h"
 #include "net/third_party/quiche/src/quic/platform/api/quic_ptr_util.h"
+#include "net/third_party/quiche/src/quic/platform/api/quic_server_stats.h"
 
 namespace quic {
 
@@ -29,23 +30,50 @@
   return result;
 }
 
-uint64_t QuicBatchWriterBase::GetReleaseTime(
+QuicBatchWriterBase::ReleaseTime QuicBatchWriterBase::GetReleaseTime(
     const PerPacketOptions* options) const {
   DCHECK(SupportsReleaseTime());
 
   if (options == nullptr) {
-    return 0;
+    return {0, QuicTime::Delta::Zero()};
   }
 
+  const uint64_t now = NowInNanosForReleaseTime();
+  const uint64_t ideal_release_time =
+      now + options->release_time_delay.ToMicroseconds() * 1000;
+
   if ((options->release_time_delay.IsZero() || options->allow_burst) &&
-      !buffered_writes().empty()) {
+      !buffered_writes().empty() &&
+      // If release time of buffered packets is in the past, flush buffered
+      // packets and buffer this packet at the ideal release time.
+      (buffered_writes().back().release_time >= now)) {
     // Send as soon as possible, but no sooner than the last buffered packet.
-    return buffered_writes().back().release_time;
+    const uint64_t actual_release_time = buffered_writes().back().release_time;
+
+    const int64_t offset_ns = actual_release_time - ideal_release_time;
+    if (offset_ns >= 0) {
+      QUIC_SERVER_HISTOGRAM_TIMES("batch_writer_positive_release_time_offset",
+                                  offset_ns / 1000, 1, 100000, 50,
+                                  "Duration from ideal release time to actual "
+                                  "release time, in microseconds.");
+    } else {
+      QUIC_SERVER_HISTOGRAM_TIMES("batch_writer_negative_release_time_offset",
+                                  -offset_ns / 1000, 1, 100000, 50,
+                                  "Duration from actual release time to ideal "
+                                  "release time, in microseconds.");
+    }
+
+    ReleaseTime result{actual_release_time,
+                       QuicTime::Delta::FromMicroseconds(offset_ns / 1000)};
+
+    QUIC_DLOG(INFO) << "ideal_release_time:" << ideal_release_time
+                    << ", actual_release_time:" << actual_release_time
+                    << ", offset:" << result.release_time_offset;
+    return result;
   }
 
   // Send according to the release time delay.
-  return NowInNanosForReleaseTime() +
-         options->release_time_delay.ToMicroseconds() * 1000;
+  return {ideal_release_time, QuicTime::Delta::Zero()};
 }
 
 WriteResult QuicBatchWriterBase::InternalWritePacket(
@@ -58,10 +86,13 @@
     return WriteResult(WRITE_STATUS_MSG_TOO_BIG, EMSGSIZE);
   }
 
-  uint64_t release_time = SupportsReleaseTime() ? GetReleaseTime(options) : 0;
+  ReleaseTime release_time = SupportsReleaseTime()
+                                 ? GetReleaseTime(options)
+                                 : ReleaseTime{0, QuicTime::Delta::Zero()};
 
-  const CanBatchResult can_batch_result = CanBatch(
-      buffer, buf_len, self_address, peer_address, options, release_time);
+  const CanBatchResult can_batch_result =
+      CanBatch(buffer, buf_len, self_address, peer_address, options,
+               release_time.actual_release_time);
 
   bool buffered = false;
   bool flush = can_batch_result.must_flush;
@@ -69,7 +100,8 @@
   if (can_batch_result.can_batch) {
     QuicBatchWriterBuffer::PushResult push_result =
         batch_buffer_->PushBufferedWrite(buffer, buf_len, self_address,
-                                         peer_address, options, release_time);
+                                         peer_address, options,
+                                         release_time.actual_release_time);
     if (push_result.succeeded) {
       buffered = true;
       // If there's no space left after the packet is buffered, force a flush.
@@ -81,12 +113,14 @@
   }
 
   if (!flush) {
-    return WriteResult(WRITE_STATUS_OK, 0);
+    WriteResult result(WRITE_STATUS_OK, 0);
+    result.send_time_offset = release_time.release_time_offset;
+    return result;
   }
 
   size_t num_buffered_packets = buffered_writes().size();
   const FlushImplResult flush_result = CheckedFlush();
-  const WriteResult& result = flush_result.write_result;
+  WriteResult result = flush_result.write_result;
   QUIC_DVLOG(1) << "Internally flushed " << flush_result.num_packets_sent
                 << " out of " << num_buffered_packets
                 << " packets. WriteResult=" << result;
@@ -103,18 +137,18 @@
         buffered ? buffered_writes().size() : buffered_writes().size() + 1;
 
     batch_buffer().Clear();
-    WriteResult result_with_dropped = result;
-    result_with_dropped.dropped_packets =
+    result.dropped_packets =
         dropped_packets > std::numeric_limits<uint16_t>::max()
             ? std::numeric_limits<uint16_t>::max()
             : static_cast<uint16_t>(dropped_packets);
-    return result_with_dropped;
+    return result;
   }
 
   if (!buffered) {
     QuicBatchWriterBuffer::PushResult push_result =
         batch_buffer_->PushBufferedWrite(buffer, buf_len, self_address,
-                                         peer_address, options, release_time);
+                                         peer_address, options,
+                                         release_time.actual_release_time);
     buffered = push_result.succeeded;
 
     // Since buffered_writes has been emptied, this write must have been
@@ -125,6 +159,7 @@
                            << ", buf_len:" << buf_len;
   }
 
+  result.send_time_offset = release_time.release_time_offset;
   return result;
 }
 
diff --git a/quic/core/batch_writer/quic_batch_writer_base.h b/quic/core/batch_writer/quic_batch_writer_base.h
index 0213894..0ba8113 100644
--- a/quic/core/batch_writer/quic_batch_writer_base.h
+++ b/quic/core/batch_writer/quic_batch_writer_base.h
@@ -73,7 +73,14 @@
 
   // Given the release delay in |options| and the state of |batch_buffer_|, get
   // the absolute release time.
-  uint64_t GetReleaseTime(const PerPacketOptions* options) const;
+  struct QUIC_NO_EXPORT ReleaseTime {
+    // The actual (absolute) release time.
+    uint64_t actual_release_time = 0;
+    // The difference between |actual_release_time| and ideal release time,
+    // which is (now + |options->release_time_delay|).
+    QuicTime::Delta release_time_offset = QuicTime::Delta::Zero();
+  };
+  ReleaseTime GetReleaseTime(const PerPacketOptions* options) const;
 
   struct QUIC_EXPORT_PRIVATE CanBatchResult {
     CanBatchResult(bool can_batch, bool must_flush)
diff --git a/quic/core/batch_writer/quic_gso_batch_writer.h b/quic/core/batch_writer/quic_gso_batch_writer.h
index 64c7e02..d210cf7 100644
--- a/quic/core/batch_writer/quic_gso_batch_writer.h
+++ b/quic/core/batch_writer/quic_gso_batch_writer.h
@@ -84,7 +84,8 @@
                   << ", peer_address: " << first.peer_address.ToString()
                   << ", num_segments: " << buffered_writes().size()
                   << ", total_bytes: " << total_bytes
-                  << ", gso_size: " << gso_size;
+                  << ", gso_size: " << gso_size
+                  << ", release_time: " << first.release_time;
 
     // All segments in a GSO packet share the same fate - if the write failed,
     // none of them are sent, and it's not needed to call PopBufferedWrite().
diff --git a/quic/core/batch_writer/quic_gso_batch_writer_test.cc b/quic/core/batch_writer/quic_gso_batch_writer_test.cc
index c7d695a..158783d 100644
--- a/quic/core/batch_writer/quic_gso_batch_writer_test.cc
+++ b/quic/core/batch_writer/quic_gso_batch_writer_test.cc
@@ -42,6 +42,7 @@
   using QuicGsoBatchWriter::GetReleaseTime;
   using QuicGsoBatchWriter::MaxSegments;
   using QuicGsoBatchWriter::QuicGsoBatchWriter;
+  using QuicGsoBatchWriter::ReleaseTime;
 
   static std::unique_ptr<TestQuicGsoBatchWriter>
   NewInstanceWithReleaseTimeSupport() {
@@ -391,7 +392,7 @@
 
 TEST_F(QuicGsoBatchWriterTest, ReleaseTimeNullOptions) {
   auto writer = TestQuicGsoBatchWriter::NewInstanceWithReleaseTimeSupport();
-  EXPECT_EQ(0, writer->GetReleaseTime(nullptr));
+  EXPECT_EQ(0, writer->GetReleaseTime(nullptr).actual_release_time);
 }
 
 TEST_F(QuicGsoBatchWriterTest, ReleaseTime) {
@@ -402,17 +403,22 @@
   TestPerPacketOptions options;
   EXPECT_TRUE(options.release_time_delay.IsZero());
   EXPECT_FALSE(options.allow_burst);
-  EXPECT_EQ(MillisToNanos(1), writer->GetReleaseTime(&options));
+  EXPECT_EQ(MillisToNanos(1),
+            writer->GetReleaseTime(&options).actual_release_time);
 
   // The 1st packet has no delay.
-  ASSERT_EQ(write_buffered, WritePacketWithOptions(writer.get(), &options));
+  WriteResult result = WritePacketWithOptions(writer.get(), &options);
+  ASSERT_EQ(write_buffered, result);
   EXPECT_EQ(MillisToNanos(1), writer->buffered_writes().back().release_time);
+  EXPECT_EQ(result.send_time_offset, QuicTime::Delta::Zero());
 
   // The 2nd packet has some delay, but allows burst.
   options.release_time_delay = QuicTime::Delta::FromMilliseconds(3);
   options.allow_burst = true;
-  ASSERT_EQ(write_buffered, WritePacketWithOptions(writer.get(), &options));
+  result = WritePacketWithOptions(writer.get(), &options);
+  ASSERT_EQ(write_buffered, result);
   EXPECT_EQ(MillisToNanos(1), writer->buffered_writes().back().release_time);
+  EXPECT_EQ(result.send_time_offset, QuicTime::Delta::FromMilliseconds(-3));
 
   // The 3rd packet has more delay and does not allow burst.
   // The first 2 packets are flushed due to different release time.
@@ -424,14 +430,17 @@
       }));
   options.release_time_delay = QuicTime::Delta::FromMilliseconds(5);
   options.allow_burst = false;
-  ASSERT_EQ(WriteResult(WRITE_STATUS_OK, 2700),
-            WritePacketWithOptions(writer.get(), &options));
+  result = WritePacketWithOptions(writer.get(), &options);
+  ASSERT_EQ(WriteResult(WRITE_STATUS_OK, 2700), result);
   EXPECT_EQ(MillisToNanos(6), writer->buffered_writes().back().release_time);
+  EXPECT_EQ(result.send_time_offset, QuicTime::Delta::Zero());
 
   // The 4th packet has same delay, but allows burst.
   options.allow_burst = true;
-  ASSERT_EQ(write_buffered, WritePacketWithOptions(writer.get(), &options));
+  result = WritePacketWithOptions(writer.get(), &options);
+  ASSERT_EQ(write_buffered, result);
   EXPECT_EQ(MillisToNanos(6), writer->buffered_writes().back().release_time);
+  EXPECT_EQ(result.send_time_offset, QuicTime::Delta::Zero());
 
   // The 5th packet has same delay, allows burst, but is shorter.
   // Packets 3,4 and 5 are flushed.
@@ -442,7 +451,8 @@
         return 0;
       }));
   options.allow_burst = true;
-  EXPECT_EQ(MillisToNanos(6), writer->GetReleaseTime(&options));
+  EXPECT_EQ(MillisToNanos(6),
+            writer->GetReleaseTime(&options).actual_release_time);
   ASSERT_EQ(WriteResult(WRITE_STATUS_OK, 3000),
             writer->WritePacket(&packet_buffer_[0], 300, self_address_,
                                 peer_address_, &options));
@@ -452,8 +462,10 @@
   // words, the release time should still be the same as packets 3-5.
   writer->ForceReleaseTimeMs(2);
   options.release_time_delay = QuicTime::Delta::FromMilliseconds(4);
-  ASSERT_EQ(write_buffered, WritePacketWithOptions(writer.get(), &options));
+  result = WritePacketWithOptions(writer.get(), &options);
+  ASSERT_EQ(write_buffered, result);
   EXPECT_EQ(MillisToNanos(6), writer->buffered_writes().back().release_time);
+  EXPECT_EQ(result.send_time_offset, QuicTime::Delta::Zero());
 }
 
 }  // namespace
diff --git a/quic/core/quic_connection.cc b/quic/core/quic_connection.cc
index 12a2e85..53dc7d9 100644
--- a/quic/core/quic_connection.cc
+++ b/quic/core/quic_connection.cc
@@ -2728,6 +2728,14 @@
     return false;
   }
 
+  if (result.status == WRITE_STATUS_OK) {
+    // packet_send_time is the ideal send time, if allow_burst is true, writer
+    // may have sent it earlier than that.
+    DCHECK((per_packet_options_ && per_packet_options_->allow_burst) ||
+           result.send_time_offset.IsZero());
+    packet_send_time = packet_send_time + result.send_time_offset;
+  }
+
   if (debug_visitor_ != nullptr) {
     // Pass the write result to the visitor.
     debug_visitor_->OnPacketSent(*packet, packet->transmission_type,
diff --git a/quic/core/quic_types.h b/quic/core/quic_types.h
index 0c736f9..b8e900d 100644
--- a/quic/core/quic_types.h
+++ b/quic/core/quic_types.h
@@ -151,6 +151,11 @@
   // Number of packets dropped as a result of this write.
   // Only used by batch writers. Otherwise always 0.
   uint16_t dropped_packets = 0;
+  // The delta between a packet's ideal and actual send time:
+  //     actual_send_time = ideal_send_time + send_time_offset
+  //                      = (now + release_time_delay) + send_time_offset
+  // Only valid if |status| is WRITE_STATUS_OK.
+  QuicTime::Delta send_time_offset = QuicTime::Delta::Zero();
   // TODO(wub): In some cases, WRITE_STATUS_ERROR may set an error_code and
   // WRITE_STATUS_BLOCKED_DATA_BUFFERED may set bytes_written. This may need
   // some cleaning up so that perhaps both values can be set and valid.