(n/a) open source quic gso batch writer. not protected.
To chromium code merger: do not add these files to BUILD.gn
PiperOrigin-RevId: 315328138
Change-Id: I4dc9d2a682659dfb8e04b8bdaa6c1423995f0ada
diff --git a/quic/core/batch_writer/quic_batch_writer_base.cc b/quic/core/batch_writer/quic_batch_writer_base.cc
new file mode 100644
index 0000000..919f76b
--- /dev/null
+++ b/quic/core/batch_writer/quic_batch_writer_base.cc
@@ -0,0 +1,177 @@
+// Copyright (c) 2019 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#include "net/third_party/quiche/src/quic/core/batch_writer/quic_batch_writer_base.h"
+#include <cstdint>
+
+#include "net/third_party/quiche/src/quic/platform/api/quic_export.h"
+#include "net/third_party/quiche/src/quic/platform/api/quic_flags.h"
+#include "net/third_party/quiche/src/quic/platform/api/quic_ptr_util.h"
+
+namespace quic {
+
+QuicBatchWriterBase::QuicBatchWriterBase(
+ std::unique_ptr<QuicBatchWriterBuffer> batch_buffer)
+ : write_blocked_(false), batch_buffer_(std::move(batch_buffer)) {}
+
+WriteResult QuicBatchWriterBase::WritePacket(
+ const char* buffer,
+ size_t buf_len,
+ const QuicIpAddress& self_address,
+ const QuicSocketAddress& peer_address,
+ PerPacketOptions* options) {
+ const WriteResult result =
+ InternalWritePacket(buffer, buf_len, self_address, peer_address, options);
+ if (result.status == WRITE_STATUS_BLOCKED) {
+ write_blocked_ = true;
+ }
+ return result;
+}
+
+uint64_t QuicBatchWriterBase::GetReleaseTime(
+ const PerPacketOptions* options) const {
+ DCHECK(SupportsReleaseTime());
+
+ if (options == nullptr) {
+ return 0;
+ }
+
+ if ((options->release_time_delay.IsZero() || options->allow_burst) &&
+ !buffered_writes().empty()) {
+ // Send as soon as possible, but no sooner than the last buffered packet.
+ return buffered_writes().back().release_time;
+ }
+
+ // Send according to the release time delay.
+ return NowInNanosForReleaseTime() +
+ options->release_time_delay.ToMicroseconds() * 1000;
+}
+
+WriteResult QuicBatchWriterBase::InternalWritePacket(
+ const char* buffer,
+ size_t buf_len,
+ const QuicIpAddress& self_address,
+ const QuicSocketAddress& peer_address,
+ PerPacketOptions* options) {
+ if (buf_len > kMaxOutgoingPacketSize) {
+ return WriteResult(WRITE_STATUS_MSG_TOO_BIG, EMSGSIZE);
+ }
+
+ uint64_t release_time = SupportsReleaseTime() ? GetReleaseTime(options) : 0;
+
+ const CanBatchResult can_batch_result = CanBatch(
+ buffer, buf_len, self_address, peer_address, options, release_time);
+
+ bool buffered = false;
+ bool flush = can_batch_result.must_flush;
+
+ if (can_batch_result.can_batch) {
+ QuicBatchWriterBuffer::PushResult push_result =
+ batch_buffer_->PushBufferedWrite(buffer, buf_len, self_address,
+ peer_address, options, release_time);
+ if (push_result.succeeded) {
+ buffered = true;
+ // If there's no space left after the packet is buffered, force a flush.
+ flush = flush || (batch_buffer_->GetNextWriteLocation() == nullptr);
+ } else {
+ // If there's no space without this packet, force a flush.
+ flush = true;
+ }
+ }
+
+ if (!flush) {
+ return WriteResult(WRITE_STATUS_OK, 0);
+ }
+
+ size_t num_buffered_packets = buffered_writes().size();
+ const FlushImplResult flush_result = CheckedFlush();
+ const WriteResult& result = flush_result.write_result;
+ QUIC_DVLOG(1) << "Internally flushed " << flush_result.num_packets_sent
+ << " out of " << num_buffered_packets
+ << " packets. WriteResult=" << result;
+
+ if (result.status != WRITE_STATUS_OK) {
+ if (IsWriteBlockedStatus(result.status)) {
+ return WriteResult(
+ buffered ? WRITE_STATUS_BLOCKED_DATA_BUFFERED : WRITE_STATUS_BLOCKED,
+ result.error_code);
+ }
+
+ // Drop all packets, including the one being written.
+ size_t dropped_packets =
+ buffered ? buffered_writes().size() : buffered_writes().size() + 1;
+
+ batch_buffer().Clear();
+ WriteResult result_with_dropped = result;
+ result_with_dropped.dropped_packets =
+ dropped_packets > std::numeric_limits<uint16_t>::max()
+ ? std::numeric_limits<uint16_t>::max()
+ : static_cast<uint16_t>(dropped_packets);
+ return result_with_dropped;
+ }
+
+ if (!buffered) {
+ QuicBatchWriterBuffer::PushResult push_result =
+ batch_buffer_->PushBufferedWrite(buffer, buf_len, self_address,
+ peer_address, options, release_time);
+ buffered = push_result.succeeded;
+
+ // Since buffered_writes has been emptied, this write must have been
+ // buffered successfully.
+ QUIC_BUG_IF(!buffered) << "Failed to push to an empty batch buffer."
+ << " self_addr:" << self_address.ToString()
+ << ", peer_addr:" << peer_address.ToString()
+ << ", buf_len:" << buf_len;
+ }
+
+ return result;
+}
+
+QuicBatchWriterBase::FlushImplResult QuicBatchWriterBase::CheckedFlush() {
+ if (buffered_writes().empty()) {
+ return FlushImplResult{WriteResult(WRITE_STATUS_OK, 0),
+ /*num_packets_sent=*/0, /*bytes_written=*/0};
+ }
+
+ const FlushImplResult flush_result = FlushImpl();
+
+ // Either flush_result.write_result.status is not WRITE_STATUS_OK, or it is
+ // WRITE_STATUS_OK and batch_buffer is empty.
+ DCHECK(flush_result.write_result.status != WRITE_STATUS_OK ||
+ buffered_writes().empty());
+
+ // Flush should never return WRITE_STATUS_BLOCKED_DATA_BUFFERED.
+ DCHECK(flush_result.write_result.status !=
+ WRITE_STATUS_BLOCKED_DATA_BUFFERED);
+
+ return flush_result;
+}
+
+WriteResult QuicBatchWriterBase::Flush() {
+ size_t num_buffered_packets = buffered_writes().size();
+ FlushImplResult flush_result = CheckedFlush();
+ QUIC_DVLOG(1) << "Externally flushed " << flush_result.num_packets_sent
+ << " out of " << num_buffered_packets
+ << " packets. WriteResult=" << flush_result.write_result;
+
+ if (IsWriteError(flush_result.write_result.status)) {
+ if (buffered_writes().size() > std::numeric_limits<uint16_t>::max()) {
+ flush_result.write_result.dropped_packets =
+ std::numeric_limits<uint16_t>::max();
+ } else {
+ flush_result.write_result.dropped_packets =
+ static_cast<uint16_t>(buffered_writes().size());
+ }
+ // Treat all errors as non-retryable fatal errors. Drop all buffered packets
+ // to avoid sending them and getting the same error again.
+ batch_buffer().Clear();
+ }
+
+ if (flush_result.write_result.status == WRITE_STATUS_BLOCKED) {
+ write_blocked_ = true;
+ }
+ return flush_result.write_result;
+}
+
+} // namespace quic
diff --git a/quic/core/batch_writer/quic_batch_writer_base.h b/quic/core/batch_writer/quic_batch_writer_base.h
new file mode 100644
index 0000000..0213894
--- /dev/null
+++ b/quic/core/batch_writer/quic_batch_writer_base.h
@@ -0,0 +1,149 @@
+// Copyright (c) 2019 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#ifndef QUICHE_QUIC_PLATFORM_IMPL_BATCH_WRITER_QUIC_BATCH_WRITER_BASE_H_
+#define QUICHE_QUIC_PLATFORM_IMPL_BATCH_WRITER_QUIC_BATCH_WRITER_BASE_H_
+
+#include <cstdint>
+#include "net/third_party/quiche/src/quic/core/batch_writer/quic_batch_writer_buffer.h"
+#include "net/third_party/quiche/src/quic/core/quic_packet_writer.h"
+#include "net/third_party/quiche/src/quic/core/quic_types.h"
+#include "net/third_party/quiche/src/quic/platform/api/quic_ip_address.h"
+#include "net/third_party/quiche/src/quic/platform/api/quic_socket_address.h"
+
+namespace quic {
+
+// QuicBatchWriterBase implements logic common to all derived batch writers,
+// including maintaining write blockage state and a skeleton implemention of
+// WritePacket().
+// A derived batch writer must override the FlushImpl() function to send all
+// buffered writes in a batch. It must also override the CanBatch() function
+// to control whether/when a WritePacket() call should flush.
+class QUIC_EXPORT_PRIVATE QuicBatchWriterBase : public QuicPacketWriter {
+ public:
+ explicit QuicBatchWriterBase(
+ std::unique_ptr<QuicBatchWriterBuffer> batch_buffer);
+
+ // ATTENTION: If this write triggered a flush, and the flush failed, all
+ // buffered packets will be dropped to allow the next write to work. The
+ // number of dropped packets can be found in WriteResult.dropped_packets.
+ WriteResult WritePacket(const char* buffer,
+ size_t buf_len,
+ const QuicIpAddress& self_address,
+ const QuicSocketAddress& peer_address,
+ PerPacketOptions* options) override;
+
+ bool IsWriteBlocked() const final { return write_blocked_; }
+
+ void SetWritable() final { write_blocked_ = false; }
+
+ QuicByteCount GetMaxPacketSize(
+ const QuicSocketAddress& peer_address) const final {
+ return kMaxOutgoingPacketSize;
+ }
+
+ bool SupportsReleaseTime() const { return false; }
+
+ bool IsBatchMode() const final { return true; }
+
+ QuicPacketBuffer GetNextWriteLocation(
+ const QuicIpAddress& self_address,
+ const QuicSocketAddress& peer_address) final {
+ // No need to explicitly delete QuicBatchWriterBuffer.
+ return {batch_buffer_->GetNextWriteLocation(), nullptr};
+ }
+
+ WriteResult Flush() final;
+
+ protected:
+ const QuicBatchWriterBuffer& batch_buffer() const { return *batch_buffer_; }
+ QuicBatchWriterBuffer& batch_buffer() { return *batch_buffer_; }
+
+ const QuicCircularDeque<BufferedWrite>& buffered_writes() const {
+ return batch_buffer_->buffered_writes();
+ }
+
+ // Get the current time in nanos which is understood by the sending api for
+ // releasing packets in the future.
+ virtual uint64_t NowInNanosForReleaseTime() const {
+ DCHECK(false) << "Should not be called since release time is unsupported.";
+ return 0;
+ }
+
+ // Given the release delay in |options| and the state of |batch_buffer_|, get
+ // the absolute release time.
+ uint64_t GetReleaseTime(const PerPacketOptions* options) const;
+
+ struct QUIC_EXPORT_PRIVATE CanBatchResult {
+ CanBatchResult(bool can_batch, bool must_flush)
+ : can_batch(can_batch), must_flush(must_flush) {}
+ // Whether this write can be batched with existing buffered writes.
+ bool can_batch;
+ // If |can_batch|, whether the caller must flush after this packet is
+ // buffered.
+ // Always true if not |can_batch|.
+ bool must_flush;
+ };
+
+ // Given the existing buffered writes(in buffered_writes()), whether a new
+ // write(in the arguments) can be batched.
+ virtual CanBatchResult CanBatch(const char* buffer,
+ size_t buf_len,
+ const QuicIpAddress& self_address,
+ const QuicSocketAddress& peer_address,
+ const PerPacketOptions* options,
+ uint64_t release_time) const = 0;
+
+ struct QUIC_EXPORT_PRIVATE FlushImplResult {
+ // The return value of the Flush() interface, which is:
+ // - WriteResult(WRITE_STATUS_OK, <bytes_flushed>) if all buffered writes
+ // were sent successfully.
+ // - WRITE_STATUS_BLOCKED or WRITE_STATUS_ERROR, if the batch write is
+ // blocked or returned an error while sending. If a portion of buffered
+ // writes were sent successfully, |FlushImplResult.num_packets_sent| and
+ // |FlushImplResult.bytes_written| contain the number of successfully sent
+ // packets and their total bytes.
+ WriteResult write_result;
+ int num_packets_sent;
+ // If write_result.status == WRITE_STATUS_OK, |bytes_written| will be equal
+ // to write_result.bytes_written. Otherwise |bytes_written| will be the
+ // number of bytes written before WRITE_BLOCK or WRITE_ERROR happened.
+ int bytes_written;
+ };
+
+ // Send all buffered writes(in buffered_writes()) in a batch.
+ // buffered_writes() is guaranteed to be non-empty when this function is
+ // called.
+ virtual FlushImplResult FlushImpl() = 0;
+
+ private:
+ WriteResult InternalWritePacket(const char* buffer,
+ size_t buf_len,
+ const QuicIpAddress& self_address,
+ const QuicSocketAddress& peer_address,
+ PerPacketOptions* options);
+
+ // Calls FlushImpl() and check its post condition.
+ FlushImplResult CheckedFlush();
+
+ bool write_blocked_;
+ std::unique_ptr<QuicBatchWriterBuffer> batch_buffer_;
+};
+
+// QuicUdpBatchWriter is a batch writer backed by a UDP socket.
+class QUIC_EXPORT_PRIVATE QuicUdpBatchWriter : public QuicBatchWriterBase {
+ public:
+ QuicUdpBatchWriter(std::unique_ptr<QuicBatchWriterBuffer> batch_buffer,
+ int fd)
+ : QuicBatchWriterBase(std::move(batch_buffer)), fd_(fd) {}
+
+ int fd() const { return fd_; }
+
+ private:
+ const int fd_;
+};
+
+} // namespace quic
+
+#endif // QUICHE_QUIC_PLATFORM_IMPL_BATCH_WRITER_QUIC_BATCH_WRITER_BASE_H_
diff --git a/quic/core/batch_writer/quic_batch_writer_buffer.cc b/quic/core/batch_writer/quic_batch_writer_buffer.cc
new file mode 100644
index 0000000..6226139
--- /dev/null
+++ b/quic/core/batch_writer/quic_batch_writer_buffer.cc
@@ -0,0 +1,154 @@
+// Copyright (c) 2019 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#include "net/third_party/quiche/src/quic/core/batch_writer/quic_batch_writer_buffer.h"
+
+#include <sstream>
+
+namespace quic {
+
+QuicBatchWriterBuffer::QuicBatchWriterBuffer() {
+ memset(buffer_, 0, sizeof(buffer_));
+}
+
+void QuicBatchWriterBuffer::Clear() {
+ buffered_writes_.clear();
+}
+
+std::string QuicBatchWriterBuffer::DebugString() const {
+ std::ostringstream os;
+ os << "{ buffer: " << static_cast<const void*>(buffer_)
+ << " buffer_end: " << static_cast<const void*>(buffer_end())
+ << " buffered_writes_.size(): " << buffered_writes_.size()
+ << " next_write_loc: " << static_cast<const void*>(GetNextWriteLocation())
+ << " SizeInUse: " << SizeInUse() << " }";
+ return os.str();
+}
+
+bool QuicBatchWriterBuffer::Invariants() const {
+ // Buffers in buffered_writes_ should not overlap, and collectively they
+ // should cover a continuous prefix of buffer_.
+ const char* next_buffer = buffer_;
+ for (auto iter = buffered_writes_.begin(); iter != buffered_writes_.end();
+ ++iter) {
+ if ((iter->buffer != next_buffer) ||
+ (iter->buffer + iter->buf_len > buffer_end())) {
+ return false;
+ }
+ next_buffer += iter->buf_len;
+ }
+
+ return (next_buffer - buffer_) == SizeInUse();
+}
+
+char* QuicBatchWriterBuffer::GetNextWriteLocation() const {
+ const char* next_loc =
+ buffered_writes_.empty()
+ ? buffer_
+ : buffered_writes_.back().buffer + buffered_writes_.back().buf_len;
+ if (buffer_end() - next_loc < kMaxOutgoingPacketSize) {
+ return nullptr;
+ }
+ return const_cast<char*>(next_loc);
+}
+
+QuicBatchWriterBuffer::PushResult QuicBatchWriterBuffer::PushBufferedWrite(
+ const char* buffer,
+ size_t buf_len,
+ const QuicIpAddress& self_address,
+ const QuicSocketAddress& peer_address,
+ const PerPacketOptions* options,
+ uint64_t release_time) {
+ DCHECK(Invariants());
+ DCHECK_LE(buf_len, kMaxOutgoingPacketSize);
+
+ PushResult result = {/*succeeded=*/false, /*buffer_copied=*/false};
+ char* next_write_location = GetNextWriteLocation();
+ if (next_write_location == nullptr) {
+ return result;
+ }
+
+ if (buffer != next_write_location) {
+ if (IsExternalBuffer(buffer, buf_len)) {
+ memcpy(next_write_location, buffer, buf_len);
+ } else if (IsInternalBuffer(buffer, buf_len)) {
+ memmove(next_write_location, buffer, buf_len);
+ } else {
+ QUIC_BUG << "Buffer[" << static_cast<const void*>(buffer) << ", "
+ << static_cast<const void*>(buffer + buf_len)
+ << ") overlaps with internal buffer["
+ << static_cast<const void*>(buffer_) << ", "
+ << static_cast<const void*>(buffer_end()) << ")";
+ return result;
+ }
+ result.buffer_copied = true;
+ } else {
+ // In place push, do nothing.
+ }
+ buffered_writes_.emplace_back(
+ next_write_location, buf_len, self_address, peer_address,
+ options ? options->Clone() : std::unique_ptr<PerPacketOptions>(),
+ release_time);
+
+ DCHECK(Invariants());
+
+ result.succeeded = true;
+ return result;
+}
+
+void QuicBatchWriterBuffer::UndoLastPush() {
+ if (!buffered_writes_.empty()) {
+ buffered_writes_.pop_back();
+ }
+}
+
+QuicBatchWriterBuffer::PopResult QuicBatchWriterBuffer::PopBufferedWrite(
+ int32_t num_buffered_writes) {
+ DCHECK(Invariants());
+ DCHECK_GE(num_buffered_writes, 0);
+ DCHECK_LE(num_buffered_writes, buffered_writes_.size());
+
+ PopResult result = {/*num_buffers_popped=*/0,
+ /*moved_remaining_buffers=*/false};
+
+ result.num_buffers_popped = std::max<int32_t>(num_buffered_writes, 0);
+ result.num_buffers_popped =
+ std::min<int32_t>(result.num_buffers_popped, buffered_writes_.size());
+ buffered_writes_.pop_front_n(result.num_buffers_popped);
+
+ if (!buffered_writes_.empty()) {
+ // If not all buffered writes are erased, the remaining ones will not cover
+ // a continuous prefix of buffer_. We'll fix it by moving the remaining
+ // buffers to the beginning of buffer_ and adjust the buffer pointers in all
+ // remaining buffered writes.
+ // This should happen very rarely, about once per write block.
+ result.moved_remaining_buffers = true;
+ const char* buffer_before_move = buffered_writes_.front().buffer;
+ size_t buffer_len_to_move = buffered_writes_.back().buffer +
+ buffered_writes_.back().buf_len -
+ buffer_before_move;
+ memmove(buffer_, buffer_before_move, buffer_len_to_move);
+
+ size_t distance_to_move = buffer_before_move - buffer_;
+ for (BufferedWrite& buffered_write : buffered_writes_) {
+ buffered_write.buffer -= distance_to_move;
+ }
+
+ DCHECK_EQ(buffer_, buffered_writes_.front().buffer);
+ }
+ DCHECK(Invariants());
+
+ return result;
+}
+
+size_t QuicBatchWriterBuffer::SizeInUse() const {
+ if (buffered_writes_.empty()) {
+ return 0;
+ }
+
+ return buffered_writes_.back().buffer + buffered_writes_.back().buf_len -
+ buffer_;
+}
+
+} // namespace quic
diff --git a/quic/core/batch_writer/quic_batch_writer_buffer.h b/quic/core/batch_writer/quic_batch_writer_buffer.h
new file mode 100644
index 0000000..a441ec3
--- /dev/null
+++ b/quic/core/batch_writer/quic_batch_writer_buffer.h
@@ -0,0 +1,95 @@
+// Copyright (c) 2019 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#ifndef QUICHE_QUIC_PLATFORM_IMPL_BATCH_WRITER_QUIC_BATCH_WRITER_BUFFER_H_
+#define QUICHE_QUIC_PLATFORM_IMPL_BATCH_WRITER_QUIC_BATCH_WRITER_BUFFER_H_
+
+#include "net/third_party/quiche/src/quic/core/quic_circular_deque.h"
+#include "net/third_party/quiche/src/quic/core/quic_linux_socket_utils.h"
+#include "net/third_party/quiche/src/quic/core/quic_packet_writer.h"
+#include "net/third_party/quiche/src/quic/platform/api/quic_aligned.h"
+#include "net/third_party/quiche/src/quic/platform/api/quic_ip_address.h"
+#include "net/third_party/quiche/src/quic/platform/api/quic_socket_address.h"
+
+namespace quic {
+
+// QuicBatchWriterBuffer manages an internal buffer to hold data from multiple
+// packets. Packet data are placed continuously within the internal buffer such
+// that they can be sent by a QuicGsoBatchWriter.
+// This class can also be used by a QuicBatchWriter which uses sendmmsg,
+// although it is not optimized for that use case.
+class QUIC_EXPORT_PRIVATE QuicBatchWriterBuffer {
+ public:
+ QuicBatchWriterBuffer();
+
+ // Clear all buffered writes, but leave the internal buffer intact.
+ void Clear();
+
+ char* GetNextWriteLocation() const;
+
+ // Push a buffered write to the back.
+ struct QUIC_EXPORT_PRIVATE PushResult {
+ bool succeeded;
+ // True in one of the following cases:
+ // 1) The packet buffer is external and copied to the internal buffer, or
+ // 2) The packet buffer is from the internal buffer and moved within it.
+ // This only happens if PopBufferedWrite is called in the middle of a
+ // in-place push.
+ // Only valid if |succeeded| is true.
+ bool buffer_copied;
+ };
+
+ PushResult PushBufferedWrite(const char* buffer,
+ size_t buf_len,
+ const QuicIpAddress& self_address,
+ const QuicSocketAddress& peer_address,
+ const PerPacketOptions* options,
+ uint64_t release_time);
+
+ void UndoLastPush();
+
+ // Pop |num_buffered_writes| buffered writes from the front.
+ // |num_buffered_writes| will be capped to [0, buffered_writes().size()]
+ // before it is used.
+ struct QUIC_EXPORT_PRIVATE PopResult {
+ int32_t num_buffers_popped;
+ // True if after |num_buffers_popped| buffers are popped from front, the
+ // remaining buffers are moved to the beginning of the internal buffer.
+ // This should normally be false.
+ bool moved_remaining_buffers;
+ };
+ PopResult PopBufferedWrite(int32_t num_buffered_writes);
+
+ const QuicCircularDeque<BufferedWrite>& buffered_writes() const {
+ return buffered_writes_;
+ }
+
+ bool IsExternalBuffer(const char* buffer, size_t buf_len) const {
+ return (buffer + buf_len) <= buffer_ || buffer >= buffer_end();
+ }
+ bool IsInternalBuffer(const char* buffer, size_t buf_len) const {
+ return buffer >= buffer_ && (buffer + buf_len) <= buffer_end();
+ }
+
+ // Number of bytes used in |buffer_|.
+ // PushBufferedWrite() increases this; PopBufferedWrite decreases this.
+ size_t SizeInUse() const;
+
+ // Rounded up from |kMaxGsoPacketSize|, which is the maximum allowed
+ // size of a GSO packet.
+ static const size_t kBufferSize = 64 * 1024;
+
+ std::string DebugString() const;
+
+ protected:
+ // Whether the invariants of the buffer are upheld. For debug & test only.
+ bool Invariants() const;
+ const char* const buffer_end() const { return buffer_ + sizeof(buffer_); }
+ QUIC_CACHELINE_ALIGNED char buffer_[kBufferSize];
+ QuicCircularDeque<BufferedWrite> buffered_writes_;
+};
+
+} // namespace quic
+
+#endif // QUICHE_QUIC_PLATFORM_IMPL_BATCH_WRITER_QUIC_BATCH_WRITER_BUFFER_H_
diff --git a/quic/core/batch_writer/quic_batch_writer_buffer_test.cc b/quic/core/batch_writer/quic_batch_writer_buffer_test.cc
new file mode 100644
index 0000000..4cdb747
--- /dev/null
+++ b/quic/core/batch_writer/quic_batch_writer_buffer_test.cc
@@ -0,0 +1,281 @@
+// Copyright (c) 2019 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#include "net/third_party/quiche/src/quic/core/batch_writer/quic_batch_writer_buffer.h"
+#include <memory>
+#include <string>
+
+#include "net/third_party/quiche/src/quic/core/quic_constants.h"
+#include "net/third_party/quiche/src/quic/platform/api/quic_ip_address.h"
+#include "net/third_party/quiche/src/quic/platform/api/quic_socket_address.h"
+#include "net/third_party/quiche/src/quic/platform/api/quic_test.h"
+
+namespace quic {
+namespace test {
+namespace {
+
+class QUIC_EXPORT_PRIVATE TestQuicBatchWriterBuffer
+ : public QuicBatchWriterBuffer {
+ public:
+ using QuicBatchWriterBuffer::buffer_;
+ using QuicBatchWriterBuffer::buffered_writes_;
+};
+
+static const size_t kBatchBufferSize = QuicBatchWriterBuffer::kBufferSize;
+
+class QuicBatchWriterBufferTest : public QuicTest {
+ public:
+ QuicBatchWriterBufferTest() { SwitchToNewBuffer(); }
+
+ void SwitchToNewBuffer() {
+ batch_buffer_ = std::make_unique<TestQuicBatchWriterBuffer>();
+ }
+
+ // Fill packet_buffer_ with kMaxOutgoingPacketSize bytes of |c|s.
+ char* FillPacketBuffer(char c) {
+ return FillPacketBuffer(c, packet_buffer_, kMaxOutgoingPacketSize);
+ }
+
+ // Fill |packet_buffer| with kMaxOutgoingPacketSize bytes of |c|s.
+ char* FillPacketBuffer(char c, char* packet_buffer) {
+ return FillPacketBuffer(c, packet_buffer, kMaxOutgoingPacketSize);
+ }
+
+ // Fill |packet_buffer| with |buf_len| bytes of |c|s.
+ char* FillPacketBuffer(char c, char* packet_buffer, size_t buf_len) {
+ memset(packet_buffer, c, buf_len);
+ return packet_buffer;
+ }
+
+ void CheckBufferedWriteContent(int buffered_write_index,
+ char buffer_content,
+ size_t buf_len,
+ const QuicIpAddress& self_addr,
+ const QuicSocketAddress& peer_addr,
+ const PerPacketOptions* options) {
+ const BufferedWrite& buffered_write =
+ batch_buffer_->buffered_writes()[buffered_write_index];
+ EXPECT_EQ(buf_len, buffered_write.buf_len);
+ for (size_t i = 0; i < buf_len; ++i) {
+ EXPECT_EQ(buffer_content, buffered_write.buffer[i]);
+ if (buffer_content != buffered_write.buffer[i]) {
+ break;
+ }
+ }
+ EXPECT_EQ(self_addr, buffered_write.self_address);
+ EXPECT_EQ(peer_addr, buffered_write.peer_address);
+ if (options == nullptr) {
+ EXPECT_EQ(nullptr, buffered_write.options);
+ } else {
+ EXPECT_EQ(options->release_time_delay,
+ buffered_write.options->release_time_delay);
+ }
+ }
+
+ protected:
+ std::unique_ptr<TestQuicBatchWriterBuffer> batch_buffer_;
+ QuicIpAddress self_addr_;
+ QuicSocketAddress peer_addr_;
+ uint64_t release_time_ = 0;
+ char packet_buffer_[kMaxOutgoingPacketSize];
+};
+
+class BufferSizeSequence {
+ public:
+ explicit BufferSizeSequence(
+ std::vector<std::pair<std::vector<size_t>, size_t>> stages)
+ : stages_(std::move(stages)),
+ total_buf_len_(0),
+ stage_index_(0),
+ sequence_index_(0) {}
+
+ size_t Next() {
+ const std::vector<size_t>& seq = stages_[stage_index_].first;
+ size_t buf_len = seq[sequence_index_++ % seq.size()];
+ total_buf_len_ += buf_len;
+ if (stages_[stage_index_].second <= total_buf_len_) {
+ stage_index_ = std::min(stage_index_ + 1, stages_.size() - 1);
+ }
+ return buf_len;
+ }
+
+ private:
+ const std::vector<std::pair<std::vector<size_t>, size_t>> stages_;
+ size_t total_buf_len_;
+ size_t stage_index_;
+ size_t sequence_index_;
+};
+
+// Test in-place pushes. A in-place push is a push with a buffer address that is
+// equal to the result of GetNextWriteLocation().
+TEST_F(QuicBatchWriterBufferTest, InPlacePushes) {
+ std::vector<BufferSizeSequence> buffer_size_sequences = {
+ // Push large writes until the buffer is near full, then switch to 1-byte
+ // writes. This covers the edge cases when detecting insufficient buffer.
+ BufferSizeSequence({{{1350}, kBatchBufferSize - 3000}, {{1}, 1e6}}),
+ // A sequence that looks real.
+ BufferSizeSequence({{{1, 39, 97, 150, 1350, 1350, 1350, 1350}, 1e6}}),
+ };
+
+ for (auto& buffer_size_sequence : buffer_size_sequences) {
+ SwitchToNewBuffer();
+ int64_t num_push_failures = 0;
+
+ while (batch_buffer_->SizeInUse() < kBatchBufferSize) {
+ size_t buf_len = buffer_size_sequence.Next();
+ const bool has_enough_space =
+ (kBatchBufferSize - batch_buffer_->SizeInUse() >=
+ kMaxOutgoingPacketSize);
+
+ char* buffer = batch_buffer_->GetNextWriteLocation();
+
+ if (has_enough_space) {
+ EXPECT_EQ(batch_buffer_->buffer_ + batch_buffer_->SizeInUse(), buffer);
+ } else {
+ EXPECT_EQ(nullptr, buffer);
+ }
+
+ SCOPED_TRACE(testing::Message()
+ << "Before Push: buf_len=" << buf_len
+ << ", has_enough_space=" << has_enough_space
+ << ", batch_buffer=" << batch_buffer_->DebugString());
+
+ auto push_result = batch_buffer_->PushBufferedWrite(
+ buffer, buf_len, self_addr_, peer_addr_, nullptr, release_time_);
+ if (!push_result.succeeded) {
+ ++num_push_failures;
+ }
+ EXPECT_EQ(has_enough_space, push_result.succeeded);
+ EXPECT_FALSE(push_result.buffer_copied);
+ if (!has_enough_space) {
+ break;
+ }
+ }
+ // Expect one and only one failure from the final push operation.
+ EXPECT_EQ(1, num_push_failures);
+ }
+}
+
+// Test some in-place pushes mixed with pushes with external buffers.
+TEST_F(QuicBatchWriterBufferTest, MixedPushes) {
+ // First, a in-place push.
+ char* buffer = batch_buffer_->GetNextWriteLocation();
+ auto push_result = batch_buffer_->PushBufferedWrite(
+ FillPacketBuffer('A', buffer), kDefaultMaxPacketSize, self_addr_,
+ peer_addr_, nullptr, release_time_);
+ EXPECT_TRUE(push_result.succeeded);
+ EXPECT_FALSE(push_result.buffer_copied);
+ CheckBufferedWriteContent(0, 'A', kDefaultMaxPacketSize, self_addr_,
+ peer_addr_, nullptr);
+
+ // Then a push with external buffer.
+ push_result = batch_buffer_->PushBufferedWrite(
+ FillPacketBuffer('B'), kDefaultMaxPacketSize, self_addr_, peer_addr_,
+ nullptr, release_time_);
+ EXPECT_TRUE(push_result.succeeded);
+ EXPECT_TRUE(push_result.buffer_copied);
+ CheckBufferedWriteContent(1, 'B', kDefaultMaxPacketSize, self_addr_,
+ peer_addr_, nullptr);
+
+ // Then another in-place push.
+ buffer = batch_buffer_->GetNextWriteLocation();
+ push_result = batch_buffer_->PushBufferedWrite(
+ FillPacketBuffer('C', buffer), kDefaultMaxPacketSize, self_addr_,
+ peer_addr_, nullptr, release_time_);
+ EXPECT_TRUE(push_result.succeeded);
+ EXPECT_FALSE(push_result.buffer_copied);
+ CheckBufferedWriteContent(2, 'C', kDefaultMaxPacketSize, self_addr_,
+ peer_addr_, nullptr);
+
+ // Then another push with external buffer.
+ push_result = batch_buffer_->PushBufferedWrite(
+ FillPacketBuffer('D'), kDefaultMaxPacketSize, self_addr_, peer_addr_,
+ nullptr, release_time_);
+ EXPECT_TRUE(push_result.succeeded);
+ EXPECT_TRUE(push_result.buffer_copied);
+ CheckBufferedWriteContent(3, 'D', kDefaultMaxPacketSize, self_addr_,
+ peer_addr_, nullptr);
+}
+
+TEST_F(QuicBatchWriterBufferTest, PopAll) {
+ const int kNumBufferedWrites = 10;
+ for (int i = 0; i < kNumBufferedWrites; ++i) {
+ EXPECT_TRUE(batch_buffer_
+ ->PushBufferedWrite(packet_buffer_, kDefaultMaxPacketSize,
+ self_addr_, peer_addr_, nullptr,
+ release_time_)
+ .succeeded);
+ }
+ EXPECT_EQ(kNumBufferedWrites, batch_buffer_->buffered_writes().size());
+
+ auto pop_result = batch_buffer_->PopBufferedWrite(kNumBufferedWrites);
+ EXPECT_EQ(0, batch_buffer_->buffered_writes().size());
+ EXPECT_EQ(kNumBufferedWrites, pop_result.num_buffers_popped);
+ EXPECT_FALSE(pop_result.moved_remaining_buffers);
+}
+
+TEST_F(QuicBatchWriterBufferTest, PopPartial) {
+ const int kNumBufferedWrites = 10;
+ for (int i = 0; i < kNumBufferedWrites; ++i) {
+ EXPECT_TRUE(batch_buffer_
+ ->PushBufferedWrite(FillPacketBuffer('A' + i),
+ kDefaultMaxPacketSize - i, self_addr_,
+ peer_addr_, nullptr, release_time_)
+ .succeeded);
+ }
+
+ for (int i = 0;
+ i < kNumBufferedWrites && !batch_buffer_->buffered_writes().empty();
+ ++i) {
+ const size_t size_before_pop = batch_buffer_->buffered_writes().size();
+ const size_t expect_size_after_pop =
+ size_before_pop < i ? 0 : size_before_pop - i;
+ batch_buffer_->PopBufferedWrite(i);
+ ASSERT_EQ(expect_size_after_pop, batch_buffer_->buffered_writes().size());
+ const char first_write_content =
+ 'A' + kNumBufferedWrites - expect_size_after_pop;
+ const size_t first_write_len =
+ kDefaultMaxPacketSize - kNumBufferedWrites + expect_size_after_pop;
+ for (int j = 0; j < expect_size_after_pop; ++j) {
+ CheckBufferedWriteContent(j, first_write_content + j, first_write_len - j,
+ self_addr_, peer_addr_, nullptr);
+ }
+ }
+}
+
+TEST_F(QuicBatchWriterBufferTest, InPlacePushWithPops) {
+ // First, a in-place push.
+ char* buffer = batch_buffer_->GetNextWriteLocation();
+ const size_t first_packet_len = 2;
+ auto push_result = batch_buffer_->PushBufferedWrite(
+ FillPacketBuffer('A', buffer, first_packet_len), first_packet_len,
+ self_addr_, peer_addr_, nullptr, release_time_);
+ EXPECT_TRUE(push_result.succeeded);
+ EXPECT_FALSE(push_result.buffer_copied);
+ CheckBufferedWriteContent(0, 'A', first_packet_len, self_addr_, peer_addr_,
+ nullptr);
+
+ // Simulate the case where the writer wants to do another in-place push, but
+ // can't do so because it can't be batched with the first buffer.
+ buffer = batch_buffer_->GetNextWriteLocation();
+ const size_t second_packet_len = 1350;
+
+ // Flush the first buffer.
+ auto pop_result = batch_buffer_->PopBufferedWrite(1);
+ EXPECT_EQ(1, pop_result.num_buffers_popped);
+ EXPECT_FALSE(pop_result.moved_remaining_buffers);
+
+ // Now the second push.
+ push_result = batch_buffer_->PushBufferedWrite(
+ FillPacketBuffer('B', buffer, second_packet_len), second_packet_len,
+ self_addr_, peer_addr_, nullptr, release_time_);
+ EXPECT_TRUE(push_result.succeeded);
+ EXPECT_TRUE(push_result.buffer_copied);
+ CheckBufferedWriteContent(0, 'B', second_packet_len, self_addr_, peer_addr_,
+ nullptr);
+}
+
+} // namespace
+} // namespace test
+} // namespace quic
diff --git a/quic/core/batch_writer/quic_batch_writer_test.cc b/quic/core/batch_writer/quic_batch_writer_test.cc
new file mode 100644
index 0000000..583b248
--- /dev/null
+++ b/quic/core/batch_writer/quic_batch_writer_test.cc
@@ -0,0 +1,78 @@
+// Copyright (c) 2019 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#include "net/third_party/quiche/src/quic/core/batch_writer/quic_batch_writer_test.h"
+#include "net/third_party/quiche/src/quic/core/batch_writer/quic_gso_batch_writer.h"
+#include "net/third_party/quiche/src/quic/core/batch_writer/quic_sendmmsg_batch_writer.h"
+
+namespace quic {
+namespace test {
+namespace {
+
+class QuicGsoBatchWriterIOTestDelegate
+ : public QuicUdpBatchWriterIOTestDelegate {
+ public:
+ bool ShouldSkip(const QuicUdpBatchWriterIOTestParams& params) override {
+ QuicUdpSocketApi socket_api;
+ int fd =
+ socket_api.Create(params.address_family,
+ /*receive_buffer_size=*/kDefaultSocketReceiveBuffer,
+ /*send_buffer_size=*/kDefaultSocketReceiveBuffer);
+ if (fd < 0) {
+ QUIC_LOG(ERROR) << "CreateSocket() failed: " << strerror(errno);
+ return false; // Let the test fail rather than skip it.
+ }
+ const bool gso_not_supported =
+ QuicLinuxSocketUtils::GetUDPSegmentSize(fd) < 0;
+ socket_api.Destroy(fd);
+
+ if (gso_not_supported) {
+ QUIC_LOG(WARNING) << "Test skipped since GSO is not supported.";
+ return true;
+ }
+
+ QUIC_LOG(WARNING) << "OK: GSO is supported.";
+ return false;
+ }
+
+ void ResetWriter(int fd) override {
+ writer_ = std::make_unique<QuicGsoBatchWriter>(
+ std::make_unique<QuicBatchWriterBuffer>(), fd);
+ }
+
+ QuicUdpBatchWriter* GetWriter() override { return writer_.get(); }
+
+ private:
+ std::unique_ptr<QuicGsoBatchWriter> writer_;
+};
+
+INSTANTIATE_TEST_SUITE_P(
+ QuicGsoBatchWriterTest,
+ QuicUdpBatchWriterIOTest,
+ testing::ValuesIn(
+ MakeQuicBatchWriterTestParams<QuicGsoBatchWriterIOTestDelegate>()));
+
+class QuicSendmmsgBatchWriterIOTestDelegate
+ : public QuicUdpBatchWriterIOTestDelegate {
+ public:
+ void ResetWriter(int fd) override {
+ writer_ = std::make_unique<QuicSendmmsgBatchWriter>(
+ std::make_unique<QuicBatchWriterBuffer>(), fd);
+ }
+
+ QuicUdpBatchWriter* GetWriter() override { return writer_.get(); }
+
+ private:
+ std::unique_ptr<QuicSendmmsgBatchWriter> writer_;
+};
+
+INSTANTIATE_TEST_SUITE_P(
+ QuicSendmmsgBatchWriterTest,
+ QuicUdpBatchWriterIOTest,
+ testing::ValuesIn(MakeQuicBatchWriterTestParams<
+ QuicSendmmsgBatchWriterIOTestDelegate>()));
+
+} // namespace
+} // namespace test
+} // namespace quic
diff --git a/quic/core/batch_writer/quic_batch_writer_test.h b/quic/core/batch_writer/quic_batch_writer_test.h
new file mode 100644
index 0000000..b4f36b9
--- /dev/null
+++ b/quic/core/batch_writer/quic_batch_writer_test.h
@@ -0,0 +1,284 @@
+// Copyright (c) 2019 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#ifndef QUICHE_QUIC_PLATFORM_IMPL_BATCH_WRITER_QUIC_BATCH_WRITER_TEST_H_
+#define QUICHE_QUIC_PLATFORM_IMPL_BATCH_WRITER_QUIC_BATCH_WRITER_TEST_H_
+
+#include <sys/socket.h>
+#include <sys/types.h>
+
+#include <iostream>
+#include <utility>
+
+#include "net/third_party/quiche/src/quic/core/batch_writer/quic_batch_writer_base.h"
+#include "net/third_party/quiche/src/quic/core/quic_udp_socket.h"
+#include "net/third_party/quiche/src/quic/platform/api/quic_test.h"
+
+namespace quic {
+namespace test {
+
+static bool IsAddressFamilySupported(int address_family) {
+ static auto check_function = [](int address_family) {
+ int fd = socket(address_family, SOCK_STREAM, 0);
+ if (fd < 0) {
+ QUIC_LOG(ERROR) << "address_family not supported: " << address_family
+ << ", error: " << strerror(errno);
+ EXPECT_EQ(EAFNOSUPPORT, errno);
+ return false;
+ }
+ close(fd);
+ return true;
+ };
+
+ if (address_family == AF_INET) {
+ static const bool ipv4_supported = check_function(AF_INET);
+ return ipv4_supported;
+ }
+
+ static const bool ipv6_supported = check_function(AF_INET6);
+ return ipv6_supported;
+}
+
+static bool CreateSocket(int family, QuicSocketAddress* address, int* fd) {
+ if (family == AF_INET) {
+ *address = QuicSocketAddress(QuicIpAddress::Loopback4(), 0);
+ } else {
+ DCHECK_EQ(family, AF_INET6);
+ *address = QuicSocketAddress(QuicIpAddress::Loopback6(), 0);
+ }
+
+ QuicUdpSocketApi socket_api;
+ *fd = socket_api.Create(family,
+ /*receive_buffer_size=*/kDefaultSocketReceiveBuffer,
+ /*send_buffer_size=*/kDefaultSocketReceiveBuffer);
+ if (*fd < 0) {
+ QUIC_LOG(ERROR) << "CreateSocket() failed: " << strerror(errno);
+ return false;
+ }
+ socket_api.EnableDroppedPacketCount(*fd);
+
+ if (!socket_api.Bind(*fd, *address)) {
+ QUIC_LOG(ERROR) << "Bind failed: " << strerror(errno);
+ return false;
+ }
+
+ if (address->FromSocket(*fd) != 0) {
+ QUIC_LOG(ERROR) << "Unable to get self address. Error: "
+ << strerror(errno);
+ return false;
+ }
+ return true;
+}
+
+struct QuicUdpBatchWriterIOTestParams;
+class QUIC_EXPORT_PRIVATE QuicUdpBatchWriterIOTestDelegate {
+ public:
+ virtual ~QuicUdpBatchWriterIOTestDelegate() {}
+
+ virtual bool ShouldSkip(const QuicUdpBatchWriterIOTestParams& params) {
+ return false;
+ }
+
+ virtual void ResetWriter(int fd) = 0;
+
+ virtual QuicUdpBatchWriter* GetWriter() = 0;
+};
+
+struct QUIC_EXPORT_PRIVATE QuicUdpBatchWriterIOTestParams {
+ // Use shared_ptr because gtest makes copies of test params.
+ std::shared_ptr<QuicUdpBatchWriterIOTestDelegate> delegate;
+ int address_family;
+ int data_size;
+ int packet_size;
+
+ QUIC_EXPORT_PRIVATE friend std::ostream& operator<<(
+ std::ostream& os,
+ const QuicUdpBatchWriterIOTestParams& p) {
+ os << "{ address_family: " << p.address_family
+ << " data_size: " << p.data_size << " packet_size: " << p.packet_size
+ << " }";
+ return os;
+ }
+};
+
+template <class QuicUdpBatchWriterIOTestDelegateT>
+static std::vector<QuicUdpBatchWriterIOTestParams>
+MakeQuicBatchWriterTestParams() {
+ static_assert(std::is_base_of<QuicUdpBatchWriterIOTestDelegate,
+ QuicUdpBatchWriterIOTestDelegateT>::value,
+ "<QuicUdpBatchWriterIOTestDelegateT> needs to derive from "
+ "QuicUdpBatchWriterIOTestDelegate");
+
+ std::vector<QuicUdpBatchWriterIOTestParams> params;
+ for (int address_family : {AF_INET, AF_INET6}) {
+ for (int data_size : {1, 150, 1500, 15000, 64000, 512 * 1024}) {
+ for (int packet_size : {1, 50, 1350, 1452}) {
+ if (packet_size <= data_size && (data_size / packet_size < 2000)) {
+ params.push_back(
+ {std::make_unique<QuicUdpBatchWriterIOTestDelegateT>(),
+ address_family, data_size, packet_size});
+ }
+ }
+ }
+ }
+ return params;
+}
+
+// QuicUdpBatchWriterIOTest is a value parameterized test fixture that can be
+// used by tests of derived classes of QuicUdpBatchWriter, to verify basic
+// packet IO capabilities.
+class QUIC_EXPORT_PRIVATE QuicUdpBatchWriterIOTest
+ : public QuicTestWithParam<QuicUdpBatchWriterIOTestParams> {
+ protected:
+ QuicUdpBatchWriterIOTest()
+ : address_family_(GetParam().address_family),
+ data_size_(GetParam().data_size),
+ packet_size_(GetParam().packet_size),
+ self_socket_(-1),
+ peer_socket_(-1) {
+ QUIC_LOG(INFO) << "QuicUdpBatchWriterIOTestParams: " << GetParam();
+ EXPECT_TRUE(address_family_ == AF_INET || address_family_ == AF_INET6);
+ EXPECT_LE(packet_size_, data_size_);
+ EXPECT_LE(packet_size_, sizeof(packet_buffer_));
+ }
+
+ ~QuicUdpBatchWriterIOTest() override {
+ if (self_socket_ > 0) {
+ close(self_socket_);
+ }
+ if (peer_socket_ > 0) {
+ close(peer_socket_);
+ }
+ }
+
+ // Whether this test should be skipped. A test is passed if skipped.
+ // A test can be skipped when e.g. it exercises a kernel feature that is not
+ // available on the system.
+ bool ShouldSkip() {
+ if (!IsAddressFamilySupported(address_family_)) {
+ QUIC_LOG(WARNING)
+ << "Test skipped since address_family is not supported.";
+ return true;
+ }
+
+ return GetParam().delegate->ShouldSkip(GetParam());
+ }
+
+ // Initialize a test.
+ // To fail the test in Initialize, use ASSERT_xx macros.
+ void Initialize() {
+ ASSERT_TRUE(CreateSocket(address_family_, &self_address_, &self_socket_));
+ ASSERT_TRUE(CreateSocket(address_family_, &peer_address_, &peer_socket_));
+
+ QUIC_DLOG(INFO) << "Self address: " << self_address_.ToString() << ", fd "
+ << self_socket_;
+ QUIC_DLOG(INFO) << "Peer address: " << peer_address_.ToString() << ", fd "
+ << peer_socket_;
+ GetParam().delegate->ResetWriter(self_socket_);
+ }
+
+ QuicUdpBatchWriter* GetWriter() { return GetParam().delegate->GetWriter(); }
+
+ void ValidateWrite() {
+ char this_packet_content = '\0';
+ int this_packet_size;
+ int num_writes = 0;
+ int bytes_flushed = 0;
+ WriteResult result;
+
+ for (int bytes_sent = 0; bytes_sent < data_size_;
+ bytes_sent += this_packet_size, ++this_packet_content) {
+ this_packet_size = std::min(packet_size_, data_size_ - bytes_sent);
+ memset(&packet_buffer_[0], this_packet_content, this_packet_size);
+
+ result = GetWriter()->WritePacket(&packet_buffer_[0], this_packet_size,
+ self_address_.host(), peer_address_,
+ nullptr);
+
+ ASSERT_EQ(WRITE_STATUS_OK, result.status) << strerror(result.error_code);
+ bytes_flushed += result.bytes_written;
+ ++num_writes;
+
+ QUIC_DVLOG(1) << "[write #" << num_writes
+ << "] this_packet_size: " << this_packet_size
+ << ", total_bytes_sent: " << bytes_sent + this_packet_size
+ << ", bytes_flushed: " << bytes_flushed
+ << ", pkt content:" << std::hex << int(this_packet_content);
+ }
+
+ result = GetWriter()->Flush();
+ ASSERT_EQ(WRITE_STATUS_OK, result.status) << strerror(result.error_code);
+ bytes_flushed += result.bytes_written;
+ ASSERT_EQ(data_size_, bytes_flushed);
+
+ QUIC_LOG(INFO) << "Sent " << data_size_ << " bytes in " << num_writes
+ << " writes.";
+ }
+
+ void ValidateRead() {
+ char this_packet_content = '\0';
+ int this_packet_size;
+ int packets_received = 0;
+ for (int bytes_received = 0; bytes_received < data_size_;
+ bytes_received += this_packet_size, ++this_packet_content) {
+ this_packet_size = std::min(packet_size_, data_size_ - bytes_received);
+ SCOPED_TRACE(testing::Message()
+ << "Before ReadPacket: bytes_received=" << bytes_received
+ << ", this_packet_size=" << this_packet_size);
+
+ QuicUdpSocketApi::ReadPacketResult result;
+ result.packet_buffer = {&packet_buffer_[0], sizeof(packet_buffer_)};
+ result.control_buffer = {&control_buffer_[0], sizeof(control_buffer_)};
+ QuicUdpSocketApi().ReadPacket(
+ peer_socket_,
+ quic::BitMask64(QuicUdpPacketInfoBit::V4_SELF_IP,
+ QuicUdpPacketInfoBit::V6_SELF_IP,
+ QuicUdpPacketInfoBit::PEER_ADDRESS),
+ &result);
+ ASSERT_TRUE(result.ok);
+ ASSERT_TRUE(
+ result.packet_info.HasValue(QuicUdpPacketInfoBit::PEER_ADDRESS));
+ QuicSocketAddress read_peer_address = result.packet_info.peer_address();
+ QuicIpAddress read_self_address = read_peer_address.host().IsIPv6()
+ ? result.packet_info.self_v6_ip()
+ : result.packet_info.self_v4_ip();
+
+ EXPECT_EQ(read_self_address, peer_address_.host());
+ EXPECT_EQ(read_peer_address, self_address_);
+ for (int i = 0; i < this_packet_size; ++i) {
+ EXPECT_EQ(this_packet_content, packet_buffer_[i]);
+ }
+ packets_received += this_packet_size;
+ }
+
+ QUIC_LOG(INFO) << "Received " << data_size_ << " bytes in "
+ << packets_received << " packets.";
+ }
+
+ QuicSocketAddress self_address_;
+ QuicSocketAddress peer_address_;
+ char packet_buffer_[1500];
+ char control_buffer_[kDefaultUdpPacketControlBufferSize];
+ int address_family_;
+ const int data_size_;
+ const int packet_size_;
+ int self_socket_;
+ int peer_socket_;
+};
+
+TEST_P(QuicUdpBatchWriterIOTest, WriteAndRead) {
+ if (ShouldSkip()) {
+ return;
+ }
+
+ Initialize();
+
+ ValidateWrite();
+ ValidateRead();
+}
+
+} // namespace test
+} // namespace quic
+
+#endif // QUICHE_QUIC_PLATFORM_IMPL_BATCH_WRITER_QUIC_BATCH_WRITER_TEST_H_
diff --git a/quic/core/batch_writer/quic_gso_batch_writer.cc b/quic/core/batch_writer/quic_gso_batch_writer.cc
new file mode 100644
index 0000000..56c0e10
--- /dev/null
+++ b/quic/core/batch_writer/quic_gso_batch_writer.cc
@@ -0,0 +1,122 @@
+// Copyright (c) 2019 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#include "net/third_party/quiche/src/quic/core/batch_writer/quic_gso_batch_writer.h"
+
+#include <time.h>
+#include <ctime>
+
+#include "net/third_party/quiche/src/quic/core/quic_linux_socket_utils.h"
+
+namespace quic {
+
+QuicGsoBatchWriter::QuicGsoBatchWriter(
+ std::unique_ptr<QuicBatchWriterBuffer> batch_buffer,
+ int fd)
+ : QuicGsoBatchWriter(std::move(batch_buffer), fd, CLOCK_MONOTONIC) {}
+
+QuicGsoBatchWriter::QuicGsoBatchWriter(
+ std::unique_ptr<QuicBatchWriterBuffer> batch_buffer,
+ int fd,
+ clockid_t clockid_for_release_time)
+ : QuicUdpBatchWriter(std::move(batch_buffer), fd),
+ clockid_for_release_time_(clockid_for_release_time),
+ supports_release_time_(
+ GetQuicRestartFlag(quic_support_release_time_for_gso) &&
+ QuicLinuxSocketUtils::EnableReleaseTime(fd,
+ clockid_for_release_time)) {
+ if (supports_release_time_) {
+ QUIC_RESTART_FLAG_COUNT(quic_support_release_time_for_gso);
+ QUIC_LOG_FIRST_N(INFO, 5) << "Release time is enabled.";
+ } else {
+ QUIC_LOG_FIRST_N(INFO, 5) << "Release time is not enabled.";
+ }
+}
+
+QuicGsoBatchWriter::QuicGsoBatchWriter(
+ std::unique_ptr<QuicBatchWriterBuffer> batch_buffer,
+ int fd,
+ clockid_t clockid_for_release_time,
+ ReleaseTimeForceEnabler enabler)
+ : QuicUdpBatchWriter(std::move(batch_buffer), fd),
+ clockid_for_release_time_(clockid_for_release_time),
+ supports_release_time_(true) {
+ QUIC_DLOG(INFO) << "Release time forcefully enabled.";
+}
+
+QuicGsoBatchWriter::CanBatchResult QuicGsoBatchWriter::CanBatch(
+ const char* buffer,
+ size_t buf_len,
+ const QuicIpAddress& self_address,
+ const QuicSocketAddress& peer_address,
+ const PerPacketOptions* options,
+ uint64_t release_time) const {
+ // If there is nothing buffered already, this write will be included in this
+ // batch.
+ if (buffered_writes().empty()) {
+ return CanBatchResult(/*can_batch=*/true, /*must_flush=*/false);
+ }
+
+ // The new write can be batched if all of the following are true:
+ // [0] The total number of the GSO segments(one write=one segment, including
+ // the new write) must not exceed |max_segments|.
+ // [1] It has the same source and destination addresses as already buffered
+ // writes.
+ // [2] It won't cause this batch to exceed kMaxGsoPacketSize.
+ // [3] Already buffered writes all have the same length.
+ // [4] Length of already buffered writes must >= length of the new write.
+ // [5] The new packet has the same release time as buffered writes.
+ const BufferedWrite& first = buffered_writes().front();
+ const BufferedWrite& last = buffered_writes().back();
+ size_t max_segments = MaxSegments(first.buf_len);
+ bool can_batch =
+ buffered_writes().size() < max_segments && // [0]
+ last.self_address == self_address && // [1]
+ last.peer_address == peer_address && // [1]
+ batch_buffer().SizeInUse() + buf_len <= kMaxGsoPacketSize && // [2]
+ first.buf_len == last.buf_len && // [3]
+ first.buf_len >= buf_len && // [4]
+ (!SupportsReleaseTime() || first.release_time == release_time); // [5]
+
+ // A flush is required if any of the following is true:
+ // [a] The new write can't be batched.
+ // [b] Length of the new write is different from the length of already
+ // buffered writes.
+ // [c] The total number of the GSO segments, including the new write, reaches
+ // |max_segments|.
+ bool must_flush = (!can_batch) || // [a]
+ (last.buf_len != buf_len) || // [b]
+ (buffered_writes().size() + 1 == max_segments); // [c]
+ return CanBatchResult(can_batch, must_flush);
+}
+
+uint64_t QuicGsoBatchWriter::NowInNanosForReleaseTime() const {
+ struct timespec ts;
+
+ if (clock_gettime(clockid_for_release_time_, &ts) != 0) {
+ return 0;
+ }
+
+ return ts.tv_sec * (1000ULL * 1000 * 1000) + ts.tv_nsec;
+}
+
+// static
+void QuicGsoBatchWriter::BuildCmsg(QuicMsgHdr* hdr,
+ const QuicIpAddress& self_address,
+ uint16_t gso_size,
+ uint64_t release_time) {
+ hdr->SetIpInNextCmsg(self_address);
+ if (gso_size > 0) {
+ *hdr->GetNextCmsgData<uint16_t>(SOL_UDP, UDP_SEGMENT) = gso_size;
+ }
+ if (release_time != 0) {
+ *hdr->GetNextCmsgData<uint64_t>(SOL_SOCKET, SO_TXTIME) = release_time;
+ }
+}
+
+QuicGsoBatchWriter::FlushImplResult QuicGsoBatchWriter::FlushImpl() {
+ return InternalFlushImpl<kCmsgSpace>(BuildCmsg);
+}
+
+} // namespace quic
diff --git a/quic/core/batch_writer/quic_gso_batch_writer.h b/quic/core/batch_writer/quic_gso_batch_writer.h
new file mode 100644
index 0000000..64c7e02
--- /dev/null
+++ b/quic/core/batch_writer/quic_gso_batch_writer.h
@@ -0,0 +1,114 @@
+// Copyright (c) 2019 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#ifndef QUICHE_QUIC_PLATFORM_IMPL_BATCH_WRITER_QUIC_GSO_BATCH_WRITER_H_
+#define QUICHE_QUIC_PLATFORM_IMPL_BATCH_WRITER_QUIC_GSO_BATCH_WRITER_H_
+
+#include "net/third_party/quiche/src/quic/core/batch_writer/quic_batch_writer_base.h"
+
+namespace quic {
+
+// QuicGsoBatchWriter sends QUIC packets in batches, using UDP socket's generic
+// segmentation offload(GSO) capability.
+class QUIC_EXPORT_PRIVATE QuicGsoBatchWriter : public QuicUdpBatchWriter {
+ public:
+ QuicGsoBatchWriter(std::unique_ptr<QuicBatchWriterBuffer> batch_buffer,
+ int fd);
+
+ // |clockid_for_release_time|: FQ qdisc requires CLOCK_MONOTONIC, EDF requires
+ // CLOCK_TAI.
+ QuicGsoBatchWriter(std::unique_ptr<QuicBatchWriterBuffer> batch_buffer,
+ int fd,
+ clockid_t clockid_for_release_time);
+
+ bool SupportsReleaseTime() const final { return supports_release_time_; }
+
+ CanBatchResult CanBatch(const char* buffer,
+ size_t buf_len,
+ const QuicIpAddress& self_address,
+ const QuicSocketAddress& peer_address,
+ const PerPacketOptions* options,
+ uint64_t release_time) const override;
+
+ FlushImplResult FlushImpl() override;
+
+ protected:
+ // Test only constructor to forcefully enable release time.
+ struct QUIC_EXPORT_PRIVATE ReleaseTimeForceEnabler {};
+ QuicGsoBatchWriter(std::unique_ptr<QuicBatchWriterBuffer> batch_buffer,
+ int fd,
+ clockid_t clockid_for_release_time,
+ ReleaseTimeForceEnabler enabler);
+
+ uint64_t NowInNanosForReleaseTime() const override;
+
+ static size_t MaxSegments(size_t gso_size) {
+ // Max segments should be the min of UDP_MAX_SEGMENTS(64) and
+ // (((64KB - sizeof(ip hdr) - sizeof(udp hdr)) / MSS) + 1), in the typical
+ // case of IPv6 packets with 1500-byte MTU, the result is
+ // ((64KB - 40 - 8) / (1500 - 48)) + 1 = 46
+ // However, due a kernel bug, the limit is much lower for tiny gso_sizes.
+ return gso_size <= 2 ? 16 : 45;
+ }
+
+ static const int kCmsgSpace =
+ kCmsgSpaceForIp + kCmsgSpaceForSegmentSize + kCmsgSpaceForTxTime;
+ static void BuildCmsg(QuicMsgHdr* hdr,
+ const QuicIpAddress& self_address,
+ uint16_t gso_size,
+ uint64_t release_time);
+
+ template <size_t CmsgSpace, typename CmsgBuilderT>
+ FlushImplResult InternalFlushImpl(CmsgBuilderT cmsg_builder) {
+ DCHECK(!IsWriteBlocked());
+ DCHECK(!buffered_writes().empty());
+
+ FlushImplResult result = {WriteResult(WRITE_STATUS_OK, 0),
+ /*num_packets_sent=*/0, /*bytes_written=*/0};
+ WriteResult& write_result = result.write_result;
+
+ int total_bytes = batch_buffer().SizeInUse();
+ const BufferedWrite& first = buffered_writes().front();
+ char cbuf[CmsgSpace];
+ QuicMsgHdr hdr(first.buffer, total_bytes, first.peer_address, cbuf,
+ sizeof(cbuf));
+
+ uint16_t gso_size = buffered_writes().size() > 1 ? first.buf_len : 0;
+ cmsg_builder(&hdr, first.self_address, gso_size, first.release_time);
+
+ write_result = QuicLinuxSocketUtils::WritePacket(fd(), hdr);
+ QUIC_DVLOG(1) << "Write GSO packet result: " << write_result
+ << ", fd: " << fd()
+ << ", self_address: " << first.self_address.ToString()
+ << ", peer_address: " << first.peer_address.ToString()
+ << ", num_segments: " << buffered_writes().size()
+ << ", total_bytes: " << total_bytes
+ << ", gso_size: " << gso_size;
+
+ // All segments in a GSO packet share the same fate - if the write failed,
+ // none of them are sent, and it's not needed to call PopBufferedWrite().
+ if (write_result.status != WRITE_STATUS_OK) {
+ return result;
+ }
+
+ result.num_packets_sent = buffered_writes().size();
+
+ write_result.bytes_written = total_bytes;
+ result.bytes_written = total_bytes;
+
+ batch_buffer().PopBufferedWrite(buffered_writes().size());
+
+ QUIC_BUG_IF(!buffered_writes().empty())
+ << "All packets should have been written on a successful return";
+ return result;
+ }
+
+ private:
+ const clockid_t clockid_for_release_time_;
+ const bool supports_release_time_;
+};
+
+} // namespace quic
+
+#endif // QUICHE_QUIC_PLATFORM_IMPL_BATCH_WRITER_QUIC_GSO_BATCH_WRITER_H_
diff --git a/quic/core/batch_writer/quic_gso_batch_writer_test.cc b/quic/core/batch_writer/quic_gso_batch_writer_test.cc
new file mode 100644
index 0000000..c7d695a
--- /dev/null
+++ b/quic/core/batch_writer/quic_gso_batch_writer_test.cc
@@ -0,0 +1,461 @@
+// Copyright (c) 2019 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#include "net/third_party/quiche/src/quic/core/batch_writer/quic_gso_batch_writer.h"
+
+#include <cstdint>
+#include <limits>
+#include <memory>
+#include <utility>
+
+#include "net/third_party/quiche/src/quic/platform/api/quic_ip_address.h"
+#include "net/third_party/quiche/src/quic/platform/api/quic_test.h"
+#include "net/third_party/quiche/src/quic/test_tools/quic_mock_syscall_wrapper.h"
+
+using testing::_;
+using testing::Invoke;
+using testing::StrictMock;
+
+namespace quic {
+namespace test {
+namespace {
+
+size_t PacketLength(const msghdr* msg) {
+ size_t length = 0;
+ for (size_t i = 0; i < msg->msg_iovlen; ++i) {
+ length += msg->msg_iov[i].iov_len;
+ }
+ return length;
+}
+
+uint64_t MillisToNanos(uint64_t milliseconds) {
+ return milliseconds * 1000000;
+}
+
+class QUIC_EXPORT_PRIVATE TestQuicGsoBatchWriter : public QuicGsoBatchWriter {
+ public:
+ using QuicGsoBatchWriter::batch_buffer;
+ using QuicGsoBatchWriter::buffered_writes;
+ using QuicGsoBatchWriter::CanBatch;
+ using QuicGsoBatchWriter::CanBatchResult;
+ using QuicGsoBatchWriter::GetReleaseTime;
+ using QuicGsoBatchWriter::MaxSegments;
+ using QuicGsoBatchWriter::QuicGsoBatchWriter;
+
+ static std::unique_ptr<TestQuicGsoBatchWriter>
+ NewInstanceWithReleaseTimeSupport() {
+ return std::unique_ptr<TestQuicGsoBatchWriter>(new TestQuicGsoBatchWriter(
+ std::make_unique<QuicBatchWriterBuffer>(),
+ /*fd=*/-1, CLOCK_MONOTONIC, ReleaseTimeForceEnabler()));
+ }
+
+ uint64_t NowInNanosForReleaseTime() const override {
+ return MillisToNanos(forced_release_time_ms_);
+ }
+
+ void ForceReleaseTimeMs(uint64_t forced_release_time_ms) {
+ forced_release_time_ms_ = forced_release_time_ms;
+ }
+
+ private:
+ uint64_t forced_release_time_ms_ = 1;
+};
+
+struct QUIC_EXPORT_PRIVATE TestPerPacketOptions : public PerPacketOptions {
+ std::unique_ptr<quic::PerPacketOptions> Clone() const override {
+ return std::make_unique<TestPerPacketOptions>(*this);
+ }
+};
+
+// TestBufferedWrite is a copy-constructible BufferedWrite.
+struct QUIC_EXPORT_PRIVATE TestBufferedWrite : public BufferedWrite {
+ using BufferedWrite::BufferedWrite;
+ TestBufferedWrite(const TestBufferedWrite& other)
+ : BufferedWrite(other.buffer,
+ other.buf_len,
+ other.self_address,
+ other.peer_address,
+ other.options ? other.options->Clone()
+ : std::unique_ptr<PerPacketOptions>(),
+ other.release_time) {}
+};
+
+// Pointed to by all instances of |BatchCriteriaTestData|. Content not used.
+static char unused_packet_buffer[kMaxOutgoingPacketSize];
+
+struct QUIC_EXPORT_PRIVATE BatchCriteriaTestData {
+ BatchCriteriaTestData(size_t buf_len,
+ const QuicIpAddress& self_address,
+ const QuicSocketAddress& peer_address,
+ uint64_t release_time,
+ bool can_batch,
+ bool must_flush)
+ : buffered_write(unused_packet_buffer,
+ buf_len,
+ self_address,
+ peer_address,
+ std::unique_ptr<PerPacketOptions>(),
+ release_time),
+ can_batch(can_batch),
+ must_flush(must_flush) {}
+
+ TestBufferedWrite buffered_write;
+ // Expected value of CanBatchResult.can_batch when batching |buffered_write|.
+ bool can_batch;
+ // Expected value of CanBatchResult.must_flush when batching |buffered_write|.
+ bool must_flush;
+};
+
+std::vector<BatchCriteriaTestData> BatchCriteriaTestData_SizeDecrease() {
+ const QuicIpAddress self_addr;
+ const QuicSocketAddress peer_addr;
+ std::vector<BatchCriteriaTestData> test_data_table = {
+ // clang-format off
+ // buf_len self_addr peer_addr t_rel can_batch must_flush
+ {1350, self_addr, peer_addr, 0, true, false},
+ {1350, self_addr, peer_addr, 0, true, false},
+ {1350, self_addr, peer_addr, 0, true, false},
+ {39, self_addr, peer_addr, 0, true, true},
+ {39, self_addr, peer_addr, 0, false, true},
+ {1350, self_addr, peer_addr, 0, false, true},
+ // clang-format on
+ };
+ return test_data_table;
+}
+
+std::vector<BatchCriteriaTestData> BatchCriteriaTestData_SizeIncrease() {
+ const QuicIpAddress self_addr;
+ const QuicSocketAddress peer_addr;
+ std::vector<BatchCriteriaTestData> test_data_table = {
+ // clang-format off
+ // buf_len self_addr peer_addr t_rel can_batch must_flush
+ {1350, self_addr, peer_addr, 0, true, false},
+ {1350, self_addr, peer_addr, 0, true, false},
+ {1350, self_addr, peer_addr, 0, true, false},
+ {1351, self_addr, peer_addr, 0, false, true},
+ // clang-format on
+ };
+ return test_data_table;
+}
+
+std::vector<BatchCriteriaTestData> BatchCriteriaTestData_AddressChange() {
+ const QuicIpAddress self_addr1 = QuicIpAddress::Loopback4();
+ const QuicIpAddress self_addr2 = QuicIpAddress::Loopback6();
+ const QuicSocketAddress peer_addr1(self_addr1, 666);
+ const QuicSocketAddress peer_addr2(self_addr1, 777);
+ const QuicSocketAddress peer_addr3(self_addr2, 666);
+ const QuicSocketAddress peer_addr4(self_addr2, 777);
+ std::vector<BatchCriteriaTestData> test_data_table = {
+ // clang-format off
+ // buf_len self_addr peer_addr t_rel can_batch must_flush
+ {1350, self_addr1, peer_addr1, 0, true, false},
+ {1350, self_addr1, peer_addr1, 0, true, false},
+ {1350, self_addr1, peer_addr1, 0, true, false},
+ {1350, self_addr2, peer_addr1, 0, false, true},
+ {1350, self_addr1, peer_addr2, 0, false, true},
+ {1350, self_addr1, peer_addr3, 0, false, true},
+ {1350, self_addr1, peer_addr4, 0, false, true},
+ {1350, self_addr1, peer_addr4, 0, false, true},
+ // clang-format on
+ };
+ return test_data_table;
+}
+
+std::vector<BatchCriteriaTestData> BatchCriteriaTestData_ReleaseTime1() {
+ const QuicIpAddress self_addr;
+ const QuicSocketAddress peer_addr;
+ std::vector<BatchCriteriaTestData> test_data_table = {
+ // clang-format off
+ // buf_len self_addr peer_addr t_rel can_batch must_flush
+ {1350, self_addr, peer_addr, 5, true, false},
+ {1350, self_addr, peer_addr, 5, true, false},
+ {1350, self_addr, peer_addr, 5, true, false},
+ {1350, self_addr, peer_addr, 9, false, true},
+ // clang-format on
+ };
+ return test_data_table;
+}
+
+std::vector<BatchCriteriaTestData> BatchCriteriaTestData_ReleaseTime2() {
+ const QuicIpAddress self_addr;
+ const QuicSocketAddress peer_addr;
+ std::vector<BatchCriteriaTestData> test_data_table = {
+ // clang-format off
+ // buf_len self_addr peer_addr t_rel can_batch must_flush
+ {1350, self_addr, peer_addr, 0, true, false},
+ {1350, self_addr, peer_addr, 0, true, false},
+ {1350, self_addr, peer_addr, 0, true, false},
+ {1350, self_addr, peer_addr, 9, false, true},
+ // clang-format on
+ };
+ return test_data_table;
+}
+
+std::vector<BatchCriteriaTestData> BatchCriteriaTestData_MaxSegments(
+ size_t gso_size) {
+ const QuicIpAddress self_addr;
+ const QuicSocketAddress peer_addr;
+ std::vector<BatchCriteriaTestData> test_data_table;
+ size_t max_segments = TestQuicGsoBatchWriter::MaxSegments(gso_size);
+ for (int i = 0; i < max_segments; ++i) {
+ bool is_last_in_batch = (i + 1 == max_segments);
+ test_data_table.push_back({gso_size, self_addr, peer_addr,
+ /*release_time=*/0, true, is_last_in_batch});
+ }
+ test_data_table.push_back(
+ {gso_size, self_addr, peer_addr, /*release_time=*/0, false, true});
+ return test_data_table;
+}
+
+class QuicGsoBatchWriterTest : public QuicTest {
+ protected:
+ WriteResult WritePacket(QuicGsoBatchWriter* writer, size_t packet_size) {
+ return writer->WritePacket(&packet_buffer_[0], packet_size, self_address_,
+ peer_address_, nullptr);
+ }
+
+ WriteResult WritePacketWithOptions(QuicGsoBatchWriter* writer,
+ PerPacketOptions* options) {
+ return writer->WritePacket(&packet_buffer_[0], 1350, self_address_,
+ peer_address_, options);
+ }
+
+ QuicIpAddress self_address_ = QuicIpAddress::Any4();
+ QuicSocketAddress peer_address_{QuicIpAddress::Any4(), 443};
+ char packet_buffer_[1500];
+ StrictMock<MockQuicSyscallWrapper> mock_syscalls_;
+ ScopedGlobalSyscallWrapperOverride syscall_override_{&mock_syscalls_};
+};
+
+TEST_F(QuicGsoBatchWriterTest, BatchCriteria) {
+ std::unique_ptr<TestQuicGsoBatchWriter> writer;
+
+ std::vector<std::vector<BatchCriteriaTestData>> test_data_tables;
+ test_data_tables.emplace_back(BatchCriteriaTestData_SizeDecrease());
+ test_data_tables.emplace_back(BatchCriteriaTestData_SizeIncrease());
+ test_data_tables.emplace_back(BatchCriteriaTestData_AddressChange());
+ test_data_tables.emplace_back(BatchCriteriaTestData_ReleaseTime1());
+ test_data_tables.emplace_back(BatchCriteriaTestData_ReleaseTime2());
+ test_data_tables.emplace_back(BatchCriteriaTestData_MaxSegments(1));
+ test_data_tables.emplace_back(BatchCriteriaTestData_MaxSegments(2));
+ test_data_tables.emplace_back(BatchCriteriaTestData_MaxSegments(1350));
+
+ for (size_t i = 0; i < test_data_tables.size(); ++i) {
+ writer = TestQuicGsoBatchWriter::NewInstanceWithReleaseTimeSupport();
+
+ const auto& test_data_table = test_data_tables[i];
+ for (size_t j = 0; j < test_data_table.size(); ++j) {
+ const BatchCriteriaTestData& test_data = test_data_table[j];
+ SCOPED_TRACE(testing::Message() << "i=" << i << ", j=" << j);
+ TestQuicGsoBatchWriter::CanBatchResult result = writer->CanBatch(
+ test_data.buffered_write.buffer, test_data.buffered_write.buf_len,
+ test_data.buffered_write.self_address,
+ test_data.buffered_write.peer_address,
+ /*options=*/nullptr, test_data.buffered_write.release_time);
+
+ ASSERT_EQ(test_data.can_batch, result.can_batch);
+ ASSERT_EQ(test_data.must_flush, result.must_flush);
+
+ if (result.can_batch) {
+ ASSERT_TRUE(
+ writer->batch_buffer()
+ .PushBufferedWrite(test_data.buffered_write.buffer,
+ test_data.buffered_write.buf_len,
+ test_data.buffered_write.self_address,
+ test_data.buffered_write.peer_address,
+ /*options=*/nullptr,
+ test_data.buffered_write.release_time)
+ .succeeded);
+ }
+ }
+ }
+}
+
+TEST_F(QuicGsoBatchWriterTest, WriteSuccess) {
+ TestQuicGsoBatchWriter writer(std::make_unique<QuicBatchWriterBuffer>(),
+ /*fd=*/-1);
+
+ ASSERT_EQ(WriteResult(WRITE_STATUS_OK, 0), WritePacket(&writer, 1000));
+
+ EXPECT_CALL(mock_syscalls_, Sendmsg(_, _, _))
+ .WillOnce(Invoke([](int sockfd, const msghdr* msg, int flags) {
+ EXPECT_EQ(1100u, PacketLength(msg));
+ return 1100;
+ }));
+ ASSERT_EQ(WriteResult(WRITE_STATUS_OK, 1100), WritePacket(&writer, 100));
+ ASSERT_EQ(0u, writer.batch_buffer().SizeInUse());
+ ASSERT_EQ(0u, writer.buffered_writes().size());
+}
+
+TEST_F(QuicGsoBatchWriterTest, WriteBlockDataNotBuffered) {
+ TestQuicGsoBatchWriter writer(std::make_unique<QuicBatchWriterBuffer>(),
+ /*fd=*/-1);
+
+ ASSERT_EQ(WriteResult(WRITE_STATUS_OK, 0), WritePacket(&writer, 100));
+ ASSERT_EQ(WriteResult(WRITE_STATUS_OK, 0), WritePacket(&writer, 100));
+
+ EXPECT_CALL(mock_syscalls_, Sendmsg(_, _, _))
+ .WillOnce(Invoke([](int sockfd, const msghdr* msg, int flags) {
+ EXPECT_EQ(200u, PacketLength(msg));
+ errno = EWOULDBLOCK;
+ return -1;
+ }));
+ ASSERT_EQ(WriteResult(WRITE_STATUS_BLOCKED, EWOULDBLOCK),
+ WritePacket(&writer, 150));
+ ASSERT_EQ(200u, writer.batch_buffer().SizeInUse());
+ ASSERT_EQ(2u, writer.buffered_writes().size());
+}
+
+TEST_F(QuicGsoBatchWriterTest, WriteBlockDataBuffered) {
+ TestQuicGsoBatchWriter writer(std::make_unique<QuicBatchWriterBuffer>(),
+ /*fd=*/-1);
+
+ ASSERT_EQ(WriteResult(WRITE_STATUS_OK, 0), WritePacket(&writer, 100));
+ ASSERT_EQ(WriteResult(WRITE_STATUS_OK, 0), WritePacket(&writer, 100));
+
+ EXPECT_CALL(mock_syscalls_, Sendmsg(_, _, _))
+ .WillOnce(Invoke([](int sockfd, const msghdr* msg, int flags) {
+ EXPECT_EQ(250u, PacketLength(msg));
+ errno = EWOULDBLOCK;
+ return -1;
+ }));
+ ASSERT_EQ(WriteResult(WRITE_STATUS_BLOCKED_DATA_BUFFERED, EWOULDBLOCK),
+ WritePacket(&writer, 50));
+ ASSERT_EQ(250u, writer.batch_buffer().SizeInUse());
+ ASSERT_EQ(3u, writer.buffered_writes().size());
+}
+
+TEST_F(QuicGsoBatchWriterTest, WriteErrorWithoutDataBuffered) {
+ TestQuicGsoBatchWriter writer(std::make_unique<QuicBatchWriterBuffer>(),
+ /*fd=*/-1);
+
+ ASSERT_EQ(WriteResult(WRITE_STATUS_OK, 0), WritePacket(&writer, 100));
+ ASSERT_EQ(WriteResult(WRITE_STATUS_OK, 0), WritePacket(&writer, 100));
+
+ EXPECT_CALL(mock_syscalls_, Sendmsg(_, _, _))
+ .WillOnce(Invoke([](int sockfd, const msghdr* msg, int flags) {
+ EXPECT_EQ(200u, PacketLength(msg));
+ errno = EPERM;
+ return -1;
+ }));
+ WriteResult error_result = WritePacket(&writer, 150);
+ ASSERT_EQ(WriteResult(WRITE_STATUS_ERROR, EPERM), error_result);
+
+ ASSERT_EQ(3u, error_result.dropped_packets);
+ ASSERT_EQ(0u, writer.batch_buffer().SizeInUse());
+ ASSERT_EQ(0u, writer.buffered_writes().size());
+}
+
+TEST_F(QuicGsoBatchWriterTest, WriteErrorAfterDataBuffered) {
+ TestQuicGsoBatchWriter writer(std::make_unique<QuicBatchWriterBuffer>(),
+ /*fd=*/-1);
+
+ ASSERT_EQ(WriteResult(WRITE_STATUS_OK, 0), WritePacket(&writer, 100));
+ ASSERT_EQ(WriteResult(WRITE_STATUS_OK, 0), WritePacket(&writer, 100));
+
+ EXPECT_CALL(mock_syscalls_, Sendmsg(_, _, _))
+ .WillOnce(Invoke([](int sockfd, const msghdr* msg, int flags) {
+ EXPECT_EQ(250u, PacketLength(msg));
+ errno = EPERM;
+ return -1;
+ }));
+ WriteResult error_result = WritePacket(&writer, 50);
+ ASSERT_EQ(WriteResult(WRITE_STATUS_ERROR, EPERM), error_result);
+
+ ASSERT_EQ(3u, error_result.dropped_packets);
+ ASSERT_EQ(0u, writer.batch_buffer().SizeInUse());
+ ASSERT_EQ(0u, writer.buffered_writes().size());
+}
+
+TEST_F(QuicGsoBatchWriterTest, FlushError) {
+ TestQuicGsoBatchWriter writer(std::make_unique<QuicBatchWriterBuffer>(),
+ /*fd=*/-1);
+
+ ASSERT_EQ(WriteResult(WRITE_STATUS_OK, 0), WritePacket(&writer, 100));
+ ASSERT_EQ(WriteResult(WRITE_STATUS_OK, 0), WritePacket(&writer, 100));
+
+ EXPECT_CALL(mock_syscalls_, Sendmsg(_, _, _))
+ .WillOnce(Invoke([](int sockfd, const msghdr* msg, int flags) {
+ EXPECT_EQ(200u, PacketLength(msg));
+ errno = EINVAL;
+ return -1;
+ }));
+ WriteResult error_result = writer.Flush();
+ ASSERT_EQ(WriteResult(WRITE_STATUS_ERROR, EINVAL), error_result);
+
+ ASSERT_EQ(2u, error_result.dropped_packets);
+ ASSERT_EQ(0u, writer.batch_buffer().SizeInUse());
+ ASSERT_EQ(0u, writer.buffered_writes().size());
+}
+
+TEST_F(QuicGsoBatchWriterTest, ReleaseTimeNullOptions) {
+ auto writer = TestQuicGsoBatchWriter::NewInstanceWithReleaseTimeSupport();
+ EXPECT_EQ(0, writer->GetReleaseTime(nullptr));
+}
+
+TEST_F(QuicGsoBatchWriterTest, ReleaseTime) {
+ const WriteResult write_buffered(WRITE_STATUS_OK, 0);
+
+ auto writer = TestQuicGsoBatchWriter::NewInstanceWithReleaseTimeSupport();
+
+ TestPerPacketOptions options;
+ EXPECT_TRUE(options.release_time_delay.IsZero());
+ EXPECT_FALSE(options.allow_burst);
+ EXPECT_EQ(MillisToNanos(1), writer->GetReleaseTime(&options));
+
+ // The 1st packet has no delay.
+ ASSERT_EQ(write_buffered, WritePacketWithOptions(writer.get(), &options));
+ EXPECT_EQ(MillisToNanos(1), writer->buffered_writes().back().release_time);
+
+ // The 2nd packet has some delay, but allows burst.
+ options.release_time_delay = QuicTime::Delta::FromMilliseconds(3);
+ options.allow_burst = true;
+ ASSERT_EQ(write_buffered, WritePacketWithOptions(writer.get(), &options));
+ EXPECT_EQ(MillisToNanos(1), writer->buffered_writes().back().release_time);
+
+ // The 3rd packet has more delay and does not allow burst.
+ // The first 2 packets are flushed due to different release time.
+ EXPECT_CALL(mock_syscalls_, Sendmsg(_, _, _))
+ .WillOnce(Invoke([](int sockfd, const msghdr* msg, int flags) {
+ EXPECT_EQ(2700u, PacketLength(msg));
+ errno = 0;
+ return 0;
+ }));
+ options.release_time_delay = QuicTime::Delta::FromMilliseconds(5);
+ options.allow_burst = false;
+ ASSERT_EQ(WriteResult(WRITE_STATUS_OK, 2700),
+ WritePacketWithOptions(writer.get(), &options));
+ EXPECT_EQ(MillisToNanos(6), writer->buffered_writes().back().release_time);
+
+ // The 4th packet has same delay, but allows burst.
+ options.allow_burst = true;
+ ASSERT_EQ(write_buffered, WritePacketWithOptions(writer.get(), &options));
+ EXPECT_EQ(MillisToNanos(6), writer->buffered_writes().back().release_time);
+
+ // The 5th packet has same delay, allows burst, but is shorter.
+ // Packets 3,4 and 5 are flushed.
+ EXPECT_CALL(mock_syscalls_, Sendmsg(_, _, _))
+ .WillOnce(Invoke([](int sockfd, const msghdr* msg, int flags) {
+ EXPECT_EQ(3000u, PacketLength(msg));
+ errno = 0;
+ return 0;
+ }));
+ options.allow_burst = true;
+ EXPECT_EQ(MillisToNanos(6), writer->GetReleaseTime(&options));
+ ASSERT_EQ(WriteResult(WRITE_STATUS_OK, 3000),
+ writer->WritePacket(&packet_buffer_[0], 300, self_address_,
+ peer_address_, &options));
+ EXPECT_TRUE(writer->buffered_writes().empty());
+
+ // Pretend 1ms has elapsed and the 6th packet has 1ms less delay. In other
+ // words, the release time should still be the same as packets 3-5.
+ writer->ForceReleaseTimeMs(2);
+ options.release_time_delay = QuicTime::Delta::FromMilliseconds(4);
+ ASSERT_EQ(write_buffered, WritePacketWithOptions(writer.get(), &options));
+ EXPECT_EQ(MillisToNanos(6), writer->buffered_writes().back().release_time);
+}
+
+} // namespace
+} // namespace test
+} // namespace quic
diff --git a/quic/core/batch_writer/quic_sendmmsg_batch_writer.cc b/quic/core/batch_writer/quic_sendmmsg_batch_writer.cc
new file mode 100644
index 0000000..efd2179
--- /dev/null
+++ b/quic/core/batch_writer/quic_sendmmsg_batch_writer.cc
@@ -0,0 +1,83 @@
+// Copyright (c) 2019 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#include "net/third_party/quiche/src/quic/core/batch_writer/quic_sendmmsg_batch_writer.h"
+
+namespace quic {
+
+QuicSendmmsgBatchWriter::QuicSendmmsgBatchWriter(
+ std::unique_ptr<QuicBatchWriterBuffer> batch_buffer,
+ int fd)
+ : QuicUdpBatchWriter(std::move(batch_buffer), fd) {}
+
+QuicSendmmsgBatchWriter::CanBatchResult QuicSendmmsgBatchWriter::CanBatch(
+ const char* buffer,
+ size_t buf_len,
+ const QuicIpAddress& self_address,
+ const QuicSocketAddress& peer_address,
+ const PerPacketOptions* options,
+ uint64_t release_time) const {
+ return CanBatchResult(/*can_batch=*/true, /*must_flush=*/false);
+}
+
+QuicSendmmsgBatchWriter::FlushImplResult QuicSendmmsgBatchWriter::FlushImpl() {
+ return InternalFlushImpl(
+ kCmsgSpaceForIp,
+ [](QuicMMsgHdr* mhdr, int i, const BufferedWrite& buffered_write) {
+ mhdr->SetIpInNextCmsg(i, buffered_write.self_address);
+ });
+}
+
+QuicSendmmsgBatchWriter::FlushImplResult
+QuicSendmmsgBatchWriter::InternalFlushImpl(size_t cmsg_space,
+ const CmsgBuilder& cmsg_builder) {
+ DCHECK(!IsWriteBlocked());
+ DCHECK(!buffered_writes().empty());
+
+ FlushImplResult result = {WriteResult(WRITE_STATUS_OK, 0),
+ /*num_packets_sent=*/0, /*bytes_written=*/0};
+ WriteResult& write_result = result.write_result;
+
+ auto first = buffered_writes().cbegin();
+ const auto last = buffered_writes().cend();
+ while (first != last) {
+ QuicMMsgHdr mhdr(first, last, cmsg_space, cmsg_builder);
+
+ int num_packets_sent;
+ write_result = QuicLinuxSocketUtils::WriteMultiplePackets(
+ fd(), &mhdr, &num_packets_sent);
+ QUIC_DVLOG(1) << "WriteMultiplePackets sent " << num_packets_sent
+ << " out of " << mhdr.num_msgs()
+ << " packets. WriteResult=" << write_result;
+
+ if (write_result.status != WRITE_STATUS_OK) {
+ DCHECK_EQ(0, num_packets_sent);
+ break;
+ } else if (num_packets_sent == 0) {
+ QUIC_BUG << "WriteMultiplePackets returned OK, but no packets were sent.";
+ write_result = WriteResult(WRITE_STATUS_ERROR, EIO);
+ break;
+ }
+
+ first += num_packets_sent;
+
+ result.num_packets_sent += num_packets_sent;
+ result.bytes_written += write_result.bytes_written;
+ }
+
+ // Call PopBufferedWrite() even if write_result.status is not WRITE_STATUS_OK,
+ // to deal with partial writes.
+ batch_buffer().PopBufferedWrite(result.num_packets_sent);
+
+ if (write_result.status != WRITE_STATUS_OK) {
+ return result;
+ }
+
+ QUIC_BUG_IF(!buffered_writes().empty())
+ << "All packets should have been written on a successful return";
+ write_result.bytes_written = result.bytes_written;
+ return result;
+}
+
+} // namespace quic
diff --git a/quic/core/batch_writer/quic_sendmmsg_batch_writer.h b/quic/core/batch_writer/quic_sendmmsg_batch_writer.h
new file mode 100644
index 0000000..26a728e
--- /dev/null
+++ b/quic/core/batch_writer/quic_sendmmsg_batch_writer.h
@@ -0,0 +1,35 @@
+// Copyright (c) 2019 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#ifndef QUICHE_QUIC_PLATFORM_IMPL_QUIC_SENDMMSG_BATCH_WRITER_H_
+#define QUICHE_QUIC_PLATFORM_IMPL_QUIC_SENDMMSG_BATCH_WRITER_H_
+
+#include "net/third_party/quiche/src/quic/core/batch_writer/quic_batch_writer_base.h"
+#include "net/third_party/quiche/src/quic/core/quic_linux_socket_utils.h"
+
+namespace quic {
+
+class QUIC_EXPORT_PRIVATE QuicSendmmsgBatchWriter : public QuicUdpBatchWriter {
+ public:
+ QuicSendmmsgBatchWriter(std::unique_ptr<QuicBatchWriterBuffer> batch_buffer,
+ int fd);
+
+ CanBatchResult CanBatch(const char* buffer,
+ size_t buf_len,
+ const QuicIpAddress& self_address,
+ const QuicSocketAddress& peer_address,
+ const PerPacketOptions* options,
+ uint64_t release_time) const override;
+
+ FlushImplResult FlushImpl() override;
+
+ protected:
+ typedef QuicMMsgHdr::ControlBufferInitializer CmsgBuilder;
+ FlushImplResult InternalFlushImpl(size_t cmsg_space,
+ const CmsgBuilder& cmsg_builder);
+};
+
+} // namespace quic
+
+#endif // QUICHE_QUIC_PLATFORM_IMPL_QUIC_SENDMMSG_BATCH_WRITER_H_
diff --git a/quic/core/batch_writer/quic_sendmmsg_batch_writer_test.cc b/quic/core/batch_writer/quic_sendmmsg_batch_writer_test.cc
new file mode 100644
index 0000000..4dcc33c
--- /dev/null
+++ b/quic/core/batch_writer/quic_sendmmsg_batch_writer_test.cc
@@ -0,0 +1,15 @@
+// Copyright (c) 2019 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#include "net/third_party/quiche/src/quic/core/batch_writer/quic_sendmmsg_batch_writer.h"
+
+namespace quic {
+namespace test {
+namespace {
+
+// Add tests here.
+
+} // namespace
+} // namespace test
+} // namespace quic
diff --git a/quic/core/quic_linux_socket_utils.cc b/quic/core/quic_linux_socket_utils.cc
new file mode 100644
index 0000000..5100a92
--- /dev/null
+++ b/quic/core/quic_linux_socket_utils.cc
@@ -0,0 +1,314 @@
+// Copyright (c) 2019 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#include "net/third_party/quiche/src/quic/core/quic_linux_socket_utils.h"
+
+#include <linux/net_tstamp.h>
+#include <netinet/in.h>
+#include <cstdint>
+
+#include "net/third_party/quiche/src/quic/core/quic_syscall_wrapper.h"
+#include "net/third_party/quiche/src/quic/platform/api/quic_ip_address.h"
+#include "net/third_party/quiche/src/quic/platform/api/quic_logging.h"
+#include "net/third_party/quiche/src/quic/platform/api/quic_socket_address.h"
+
+namespace quic {
+
+QuicMsgHdr::QuicMsgHdr(const char* buffer,
+ size_t buf_len,
+ const QuicSocketAddress& peer_address,
+ char* cbuf,
+ size_t cbuf_size)
+ : iov_{const_cast<char*>(buffer), buf_len},
+ cbuf_(cbuf),
+ cbuf_size_(cbuf_size),
+ cmsg_(nullptr) {
+ // Only support unconnected sockets.
+ DCHECK(peer_address.IsInitialized());
+
+ raw_peer_address_ = peer_address.generic_address();
+ hdr_.msg_name = &raw_peer_address_;
+ hdr_.msg_namelen = raw_peer_address_.ss_family == AF_INET
+ ? sizeof(sockaddr_in)
+ : sizeof(sockaddr_in6);
+
+ hdr_.msg_iov = &iov_;
+ hdr_.msg_iovlen = 1;
+ hdr_.msg_flags = 0;
+
+ hdr_.msg_control = nullptr;
+ hdr_.msg_controllen = 0;
+}
+
+void QuicMsgHdr::SetIpInNextCmsg(const QuicIpAddress& self_address) {
+ if (!self_address.IsInitialized()) {
+ return;
+ }
+
+ if (self_address.IsIPv4()) {
+ QuicLinuxSocketUtils::SetIpInfoInCmsgData(
+ self_address, GetNextCmsgData<in_pktinfo>(IPPROTO_IP, IP_PKTINFO));
+ } else {
+ QuicLinuxSocketUtils::SetIpInfoInCmsgData(
+ self_address, GetNextCmsgData<in6_pktinfo>(IPPROTO_IPV6, IPV6_PKTINFO));
+ }
+}
+
+void* QuicMsgHdr::GetNextCmsgDataInternal(int cmsg_level,
+ int cmsg_type,
+ size_t data_size) {
+ // msg_controllen needs to be increased first, otherwise CMSG_NXTHDR will
+ // return nullptr.
+ hdr_.msg_controllen += CMSG_SPACE(data_size);
+ DCHECK_LE(hdr_.msg_controllen, cbuf_size_);
+
+ if (cmsg_ == nullptr) {
+ DCHECK_EQ(nullptr, hdr_.msg_control);
+ memset(cbuf_, 0, cbuf_size_);
+ hdr_.msg_control = cbuf_;
+ cmsg_ = CMSG_FIRSTHDR(&hdr_);
+ } else {
+ DCHECK_NE(nullptr, hdr_.msg_control);
+ cmsg_ = CMSG_NXTHDR(&hdr_, cmsg_);
+ }
+
+ DCHECK_NE(nullptr, cmsg_) << "Insufficient control buffer space";
+
+ cmsg_->cmsg_len = CMSG_LEN(data_size);
+ cmsg_->cmsg_level = cmsg_level;
+ cmsg_->cmsg_type = cmsg_type;
+
+ return CMSG_DATA(cmsg_);
+}
+
+void QuicMMsgHdr::InitOneHeader(int i, const BufferedWrite& buffered_write) {
+ mmsghdr* mhdr = GetMMsgHdr(i);
+ msghdr* hdr = &mhdr->msg_hdr;
+ iovec* iov = GetIov(i);
+
+ iov->iov_base = const_cast<char*>(buffered_write.buffer);
+ iov->iov_len = buffered_write.buf_len;
+ hdr->msg_iov = iov;
+ hdr->msg_iovlen = 1;
+ hdr->msg_control = nullptr;
+ hdr->msg_controllen = 0;
+
+ // Only support unconnected sockets.
+ DCHECK(buffered_write.peer_address.IsInitialized());
+
+ sockaddr_storage* peer_address_storage = GetPeerAddressStorage(i);
+ *peer_address_storage = buffered_write.peer_address.generic_address();
+ hdr->msg_name = peer_address_storage;
+ hdr->msg_namelen = peer_address_storage->ss_family == AF_INET
+ ? sizeof(sockaddr_in)
+ : sizeof(sockaddr_in6);
+}
+
+void QuicMMsgHdr::SetIpInNextCmsg(int i, const QuicIpAddress& self_address) {
+ if (!self_address.IsInitialized()) {
+ return;
+ }
+
+ if (self_address.IsIPv4()) {
+ QuicLinuxSocketUtils::SetIpInfoInCmsgData(
+ self_address, GetNextCmsgData<in_pktinfo>(i, IPPROTO_IP, IP_PKTINFO));
+ } else {
+ QuicLinuxSocketUtils::SetIpInfoInCmsgData(
+ self_address,
+ GetNextCmsgData<in6_pktinfo>(i, IPPROTO_IPV6, IPV6_PKTINFO));
+ }
+}
+
+void* QuicMMsgHdr::GetNextCmsgDataInternal(int i,
+ int cmsg_level,
+ int cmsg_type,
+ size_t data_size) {
+ mmsghdr* mhdr = GetMMsgHdr(i);
+ msghdr* hdr = &mhdr->msg_hdr;
+ cmsghdr*& cmsg = *GetCmsgHdr(i);
+
+ // msg_controllen needs to be increased first, otherwise CMSG_NXTHDR will
+ // return nullptr.
+ hdr->msg_controllen += CMSG_SPACE(data_size);
+ DCHECK_LE(hdr->msg_controllen, cbuf_size_);
+
+ if (cmsg == nullptr) {
+ DCHECK_EQ(nullptr, hdr->msg_control);
+ hdr->msg_control = GetCbuf(i);
+ cmsg = CMSG_FIRSTHDR(hdr);
+ } else {
+ DCHECK_NE(nullptr, hdr->msg_control);
+ cmsg = CMSG_NXTHDR(hdr, cmsg);
+ }
+
+ DCHECK_NE(nullptr, cmsg) << "Insufficient control buffer space";
+
+ cmsg->cmsg_len = CMSG_LEN(data_size);
+ cmsg->cmsg_level = cmsg_level;
+ cmsg->cmsg_type = cmsg_type;
+
+ return CMSG_DATA(cmsg);
+}
+
+int QuicMMsgHdr::num_bytes_sent(int num_packets_sent) {
+ DCHECK_LE(0, num_packets_sent);
+ DCHECK_LE(num_packets_sent, num_msgs_);
+
+ int bytes_sent = 0;
+ iovec* iov = GetIov(0);
+ for (int i = 0; i < num_packets_sent; ++i) {
+ bytes_sent += iov[i].iov_len;
+ }
+ return bytes_sent;
+}
+
+// static
+int QuicLinuxSocketUtils::GetUDPSegmentSize(int fd) {
+ int optval;
+ socklen_t optlen = sizeof(optval);
+ int rc = getsockopt(fd, SOL_UDP, UDP_SEGMENT, &optval, &optlen);
+ if (rc < 0) {
+ QUIC_LOG_EVERY_N_SEC(INFO, 10)
+ << "getsockopt(UDP_SEGMENT) failed: " << strerror(errno);
+ return -1;
+ }
+ QUIC_LOG_EVERY_N_SEC(INFO, 10)
+ << "getsockopt(UDP_SEGMENT) returned segment size: " << optval;
+ return optval;
+}
+
+// static
+bool QuicLinuxSocketUtils::EnableReleaseTime(int fd, clockid_t clockid) {
+ // TODO(wub): Change to sock_txtime once it is available in linux/net_tstamp.h
+ struct LinuxSockTxTime {
+ clockid_t clockid; /* reference clockid */
+ uint32_t flags; /* flags defined by enum txtime_flags */
+ };
+
+ LinuxSockTxTime so_txtime_val = {.clockid = clockid};
+
+ if (setsockopt(fd, SOL_SOCKET, SO_TXTIME, &so_txtime_val,
+ sizeof(so_txtime_val)) != 0) {
+ QUIC_LOG_EVERY_N_SEC(INFO, 10)
+ << "setsockopt(SOL_SOCKET,SO_TXTIME) failed: " << strerror(errno);
+ return false;
+ }
+
+ return true;
+}
+
+// static
+bool QuicLinuxSocketUtils::GetTtlFromMsghdr(struct msghdr* hdr, int* ttl) {
+ if (hdr->msg_controllen > 0) {
+ struct cmsghdr* cmsg;
+ for (cmsg = CMSG_FIRSTHDR(hdr); cmsg != nullptr;
+ cmsg = CMSG_NXTHDR(hdr, cmsg)) {
+ if ((cmsg->cmsg_level == IPPROTO_IP && cmsg->cmsg_type == IP_TTL) ||
+ (cmsg->cmsg_level == IPPROTO_IPV6 &&
+ cmsg->cmsg_type == IPV6_HOPLIMIT)) {
+ *ttl = *(reinterpret_cast<int*>(CMSG_DATA(cmsg)));
+ return true;
+ }
+ }
+ }
+ return false;
+}
+
+// static
+void QuicLinuxSocketUtils::SetIpInfoInCmsgData(
+ const QuicIpAddress& self_address,
+ void* cmsg_data) {
+ DCHECK(self_address.IsInitialized());
+ const std::string& address_str = self_address.ToPackedString();
+ if (self_address.IsIPv4()) {
+ in_pktinfo* pktinfo = static_cast<in_pktinfo*>(cmsg_data);
+ pktinfo->ipi_ifindex = 0;
+ memcpy(&pktinfo->ipi_spec_dst, address_str.c_str(), address_str.length());
+ } else if (self_address.IsIPv6()) {
+ in6_pktinfo* pktinfo = static_cast<in6_pktinfo*>(cmsg_data);
+ memcpy(&pktinfo->ipi6_addr, address_str.c_str(), address_str.length());
+ } else {
+ QUIC_BUG << "Unrecognized IPAddress";
+ }
+}
+
+// static
+size_t QuicLinuxSocketUtils::SetIpInfoInCmsg(const QuicIpAddress& self_address,
+ cmsghdr* cmsg) {
+ std::string address_string;
+ if (self_address.IsIPv4()) {
+ cmsg->cmsg_len = CMSG_LEN(sizeof(in_pktinfo));
+ cmsg->cmsg_level = IPPROTO_IP;
+ cmsg->cmsg_type = IP_PKTINFO;
+ in_pktinfo* pktinfo = reinterpret_cast<in_pktinfo*>(CMSG_DATA(cmsg));
+ memset(pktinfo, 0, sizeof(in_pktinfo));
+ pktinfo->ipi_ifindex = 0;
+ address_string = self_address.ToPackedString();
+ memcpy(&pktinfo->ipi_spec_dst, address_string.c_str(),
+ address_string.length());
+ return sizeof(in_pktinfo);
+ } else if (self_address.IsIPv6()) {
+ cmsg->cmsg_len = CMSG_LEN(sizeof(in6_pktinfo));
+ cmsg->cmsg_level = IPPROTO_IPV6;
+ cmsg->cmsg_type = IPV6_PKTINFO;
+ in6_pktinfo* pktinfo = reinterpret_cast<in6_pktinfo*>(CMSG_DATA(cmsg));
+ memset(pktinfo, 0, sizeof(in6_pktinfo));
+ address_string = self_address.ToPackedString();
+ memcpy(&pktinfo->ipi6_addr, address_string.c_str(),
+ address_string.length());
+ return sizeof(in6_pktinfo);
+ } else {
+ QUIC_BUG << "Unrecognized IPAddress";
+ return 0;
+ }
+}
+
+// static
+WriteResult QuicLinuxSocketUtils::WritePacket(int fd, const QuicMsgHdr& hdr) {
+ int rc;
+ do {
+ rc = GetGlobalSyscallWrapper()->Sendmsg(fd, hdr.hdr(), 0);
+ } while (rc < 0 && errno == EINTR);
+ if (rc >= 0) {
+ return WriteResult(WRITE_STATUS_OK, rc);
+ }
+ return WriteResult((errno == EAGAIN || errno == EWOULDBLOCK)
+ ? WRITE_STATUS_BLOCKED
+ : WRITE_STATUS_ERROR,
+ errno);
+}
+
+// static
+WriteResult QuicLinuxSocketUtils::WriteMultiplePackets(int fd,
+ QuicMMsgHdr* mhdr,
+ int* num_packets_sent) {
+ *num_packets_sent = 0;
+
+ if (mhdr->num_msgs() <= 0) {
+ return WriteResult(WRITE_STATUS_ERROR, EINVAL);
+ }
+
+ int rc;
+ do {
+ rc = GetGlobalSyscallWrapper()->Sendmmsg(fd, mhdr->mhdr(), mhdr->num_msgs(),
+ 0);
+ } while (rc < 0 && errno == EINTR);
+
+ if (rc > 0) {
+ *num_packets_sent = rc;
+
+ return WriteResult(WRITE_STATUS_OK, mhdr->num_bytes_sent(rc));
+ } else if (rc == 0) {
+ QUIC_BUG << "sendmmsg returned 0, returning WRITE_STATUS_ERROR. errno: "
+ << errno;
+ errno = EIO;
+ }
+
+ return WriteResult((errno == EAGAIN || errno == EWOULDBLOCK)
+ ? WRITE_STATUS_BLOCKED
+ : WRITE_STATUS_ERROR,
+ errno);
+}
+
+} // namespace quic
diff --git a/quic/core/quic_linux_socket_utils.h b/quic/core/quic_linux_socket_utils.h
new file mode 100644
index 0000000..4091657
--- /dev/null
+++ b/quic/core/quic_linux_socket_utils.h
@@ -0,0 +1,298 @@
+// Copyright (c) 2019 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#ifndef QUICHE_QUIC_PLATFORM_IMPL_QUIC_LINUX_SOCKET_UTILS_H_
+#define QUICHE_QUIC_PLATFORM_IMPL_QUIC_LINUX_SOCKET_UTILS_H_
+
+#include <errno.h>
+#include <stddef.h>
+#include <string.h>
+#include <sys/socket.h>
+#include <sys/uio.h>
+#include <deque>
+#include <functional>
+#include <iterator>
+#include <memory>
+#include <type_traits>
+#include <utility>
+
+#include "net/third_party/quiche/src/quic/core/quic_packet_writer.h"
+#include "net/third_party/quiche/src/quic/core/quic_types.h"
+#include "net/third_party/quiche/src/quic/platform/api/quic_bug_tracker.h"
+#include "net/third_party/quiche/src/quic/platform/api/quic_ip_address.h"
+#include "net/third_party/quiche/src/quic/platform/api/quic_logging.h"
+#include "net/third_party/quiche/src/quic/platform/api/quic_socket_address.h"
+
+#ifndef SOL_UDP
+#define SOL_UDP 17
+#endif
+
+#ifndef UDP_SEGMENT
+#define UDP_SEGMENT 103
+#endif
+
+#ifndef UDP_MAX_SEGMENTS
+#define UDP_MAX_SEGMENTS (1 << 6UL)
+#endif
+
+#ifndef SO_TXTIME
+#define SO_TXTIME 61
+#endif
+
+namespace quic {
+
+const int kCmsgSpaceForIpv4 = CMSG_SPACE(sizeof(in_pktinfo));
+const int kCmsgSpaceForIpv6 = CMSG_SPACE(sizeof(in6_pktinfo));
+// kCmsgSpaceForIp should be big enough to hold both IPv4 and IPv6 packet info.
+const int kCmsgSpaceForIp = (kCmsgSpaceForIpv4 < kCmsgSpaceForIpv6)
+ ? kCmsgSpaceForIpv6
+ : kCmsgSpaceForIpv4;
+
+const int kCmsgSpaceForSegmentSize = CMSG_SPACE(sizeof(uint16_t));
+
+const int kCmsgSpaceForTxTime = CMSG_SPACE(sizeof(uint64_t));
+
+const int kCmsgSpaceForTTL = CMSG_SPACE(sizeof(int));
+
+// QuicMsgHdr is used to build msghdr objects that can be used send packets via
+// ::sendmsg.
+//
+// Example:
+// // cbuf holds control messages(cmsgs). The size is determined from what
+// // cmsgs will be set for this msghdr.
+// char cbuf[kCmsgSpaceForIp + kCmsgSpaceForSegmentSize];
+// QuicMsgHdr hdr(packet_buf, packet_buf_len, peer_addr, cbuf, sizeof(cbuf));
+//
+// // Set IP in cmsgs.
+// hdr.SetIpInNextCmsg(self_addr);
+//
+// // Set GSO size in cmsgs.
+// *hdr.GetNextCmsgData<uint16_t>(SOL_UDP, UDP_SEGMENT) = 1200;
+//
+// QuicLinuxSocketUtils::WritePacket(fd, hdr);
+class QUIC_EXPORT_PRIVATE QuicMsgHdr {
+ public:
+ QuicMsgHdr(const char* buffer,
+ size_t buf_len,
+ const QuicSocketAddress& peer_address,
+ char* cbuf,
+ size_t cbuf_size);
+
+ // Set IP info in the next cmsg. Both IPv4 and IPv6 are supported.
+ void SetIpInNextCmsg(const QuicIpAddress& self_address);
+
+ template <typename DataType>
+ DataType* GetNextCmsgData(int cmsg_level, int cmsg_type) {
+ return reinterpret_cast<DataType*>(
+ GetNextCmsgDataInternal(cmsg_level, cmsg_type, sizeof(DataType)));
+ }
+
+ const msghdr* hdr() const { return &hdr_; }
+
+ protected:
+ void* GetNextCmsgDataInternal(int cmsg_level,
+ int cmsg_type,
+ size_t data_size);
+
+ msghdr hdr_;
+ iovec iov_;
+ sockaddr_storage raw_peer_address_;
+ char* cbuf_;
+ const size_t cbuf_size_;
+ // The last cmsg populated so far. nullptr means nothing has been populated.
+ cmsghdr* cmsg_;
+};
+
+// BufferedWrite holds all information needed to send a packet.
+struct QUIC_EXPORT_PRIVATE BufferedWrite {
+ BufferedWrite(const char* buffer,
+ size_t buf_len,
+ const QuicIpAddress& self_address,
+ const QuicSocketAddress& peer_address)
+ : BufferedWrite(buffer,
+ buf_len,
+ self_address,
+ peer_address,
+ std::unique_ptr<PerPacketOptions>(),
+ /*release_time=*/0) {}
+
+ BufferedWrite(const char* buffer,
+ size_t buf_len,
+ const QuicIpAddress& self_address,
+ const QuicSocketAddress& peer_address,
+ std::unique_ptr<PerPacketOptions> options,
+ uint64_t release_time)
+ : buffer(buffer),
+ buf_len(buf_len),
+ self_address(self_address),
+ peer_address(peer_address),
+ options(std::move(options)),
+ release_time(release_time) {}
+
+ const char* buffer; // Not owned.
+ size_t buf_len;
+ QuicIpAddress self_address;
+ QuicSocketAddress peer_address;
+ std::unique_ptr<PerPacketOptions> options;
+
+ // The release time according to the owning packet writer's clock, which is
+ // often not a QuicClock. Calculated from packet writer's Now() and the
+ // release time delay in |options|.
+ // 0 means it can be sent at the same time as the previous packet in a batch,
+ // or can be sent Now() if this is the first packet of a batch.
+ uint64_t release_time;
+};
+
+// QuicMMsgHdr is used to build mmsghdr objects that can be used to send
+// multiple packets at once via ::sendmmsg.
+//
+// Example:
+// QuicCircularDeque<BufferedWrite> buffered_writes;
+// ... (Populate buffered_writes) ...
+//
+// QuicMMsgHdr mhdr(
+// buffered_writes.begin(), buffered_writes.end(), kCmsgSpaceForIp,
+// [](QuicMMsgHdr* mhdr, int i, const BufferedWrite& buffered_write) {
+// mhdr->SetIpInNextCmsg(i, buffered_write.self_address);
+// });
+//
+// int num_packets_sent;
+// QuicSocketUtils::WriteMultiplePackets(fd, &mhdr, &num_packets_sent);
+class QUIC_EXPORT_PRIVATE QuicMMsgHdr {
+ public:
+ typedef std::function<
+ void(QuicMMsgHdr* mhdr, int i, const BufferedWrite& buffered_write)>
+ ControlBufferInitializer;
+ template <typename IteratorT>
+ QuicMMsgHdr(const IteratorT& first,
+ const IteratorT& last,
+ size_t cbuf_size,
+ ControlBufferInitializer cbuf_initializer)
+ : num_msgs_(std::distance(first, last)), cbuf_size_(cbuf_size) {
+ static_assert(
+ std::is_same<typename std::iterator_traits<IteratorT>::value_type,
+ BufferedWrite>::value,
+ "Must iterate over a collection of BufferedWrite.");
+
+ DCHECK_LE(0, num_msgs_);
+ if (num_msgs_ == 0) {
+ return;
+ }
+
+ storage_.reset(new char[StorageSize()]);
+ memset(&storage_[0], 0, StorageSize());
+
+ int i = -1;
+ for (auto it = first; it != last; ++it) {
+ ++i;
+
+ InitOneHeader(i, *it);
+ if (cbuf_initializer) {
+ cbuf_initializer(this, i, *it);
+ }
+ }
+ }
+
+ void SetIpInNextCmsg(int i, const QuicIpAddress& self_address);
+
+ template <typename DataType>
+ DataType* GetNextCmsgData(int i, int cmsg_level, int cmsg_type) {
+ return reinterpret_cast<DataType*>(
+ GetNextCmsgDataInternal(i, cmsg_level, cmsg_type, sizeof(DataType)));
+ }
+
+ mmsghdr* mhdr() { return GetMMsgHdr(0); }
+
+ int num_msgs() const { return num_msgs_; }
+
+ // Get the total number of bytes in the first |num_packets_sent| packets.
+ int num_bytes_sent(int num_packets_sent);
+
+ protected:
+ void InitOneHeader(int i, const BufferedWrite& buffered_write);
+
+ void* GetNextCmsgDataInternal(int i,
+ int cmsg_level,
+ int cmsg_type,
+ size_t data_size);
+
+ size_t StorageSize() const {
+ return num_msgs_ *
+ (sizeof(mmsghdr) + sizeof(iovec) + sizeof(sockaddr_storage) +
+ sizeof(cmsghdr*) + cbuf_size_);
+ }
+
+ mmsghdr* GetMMsgHdr(int i) {
+ auto* first = reinterpret_cast<mmsghdr*>(&storage_[0]);
+ return &first[i];
+ }
+
+ iovec* GetIov(int i) {
+ auto* first = reinterpret_cast<iovec*>(GetMMsgHdr(num_msgs_));
+ return &first[i];
+ }
+
+ sockaddr_storage* GetPeerAddressStorage(int i) {
+ auto* first = reinterpret_cast<sockaddr_storage*>(GetIov(num_msgs_));
+ return &first[i];
+ }
+
+ cmsghdr** GetCmsgHdr(int i) {
+ auto* first = reinterpret_cast<cmsghdr**>(GetPeerAddressStorage(num_msgs_));
+ return &first[i];
+ }
+
+ char* GetCbuf(int i) {
+ auto* first = reinterpret_cast<char*>(GetCmsgHdr(num_msgs_));
+ return &first[i * cbuf_size_];
+ }
+
+ const int num_msgs_;
+ // Size of cmsg buffer for each message.
+ const size_t cbuf_size_;
+ // storage_ holds the memory of
+ // |num_msgs_| mmsghdr
+ // |num_msgs_| iovec
+ // |num_msgs_| sockaddr_storage, for peer addresses
+ // |num_msgs_| cmsghdr*
+ // |num_msgs_| cbuf, each of size cbuf_size
+ std::unique_ptr<char[]> storage_;
+};
+
+class QUIC_EXPORT_PRIVATE QuicLinuxSocketUtils {
+ public:
+ // Return the UDP segment size of |fd|, 0 means segment size has not been set
+ // on this socket. If GSO is not supported, return -1.
+ static int GetUDPSegmentSize(int fd);
+
+ // Enable release time on |fd|.
+ static bool EnableReleaseTime(int fd, clockid_t clockid);
+
+ // If the msghdr contains an IP_TTL entry, this will set ttl to the correct
+ // value and return true. Otherwise it will return false.
+ static bool GetTtlFromMsghdr(struct msghdr* hdr, int* ttl);
+
+ // Set IP(self_address) in |cmsg_data|. Does not touch other fields in the
+ // containing cmsghdr.
+ static void SetIpInfoInCmsgData(const QuicIpAddress& self_address,
+ void* cmsg_data);
+
+ // A helper for WritePacket which fills in the cmsg with the supplied self
+ // address.
+ // Returns the length of the packet info structure used.
+ static size_t SetIpInfoInCmsg(const QuicIpAddress& self_address,
+ cmsghdr* cmsg);
+
+ // Writes the packet in |hdr| to the socket, using ::sendmsg.
+ static WriteResult WritePacket(int fd, const QuicMsgHdr& hdr);
+
+ // Writes the packets in |mhdr| to the socket, using ::sendmmsg if available.
+ static WriteResult WriteMultiplePackets(int fd,
+ QuicMMsgHdr* mhdr,
+ int* num_packets_sent);
+};
+
+} // namespace quic
+
+#endif // QUICHE_QUIC_PLATFORM_IMPL_QUIC_LINUX_SOCKET_UTILS_H_
diff --git a/quic/core/quic_linux_socket_utils_test.cc b/quic/core/quic_linux_socket_utils_test.cc
new file mode 100644
index 0000000..32ad12f
--- /dev/null
+++ b/quic/core/quic_linux_socket_utils_test.cc
@@ -0,0 +1,329 @@
+// Copyright (c) 2019 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#include "net/third_party/quiche/src/quic/core/quic_linux_socket_utils.h"
+
+#include <netinet/in.h>
+#include <stdint.h>
+#include <cstddef>
+#include <sstream>
+#include <vector>
+
+#include <string>
+
+#include "net/third_party/quiche/src/quic/core/quic_circular_deque.h"
+#include "net/third_party/quiche/src/quic/platform/api/quic_test.h"
+#include "net/third_party/quiche/src/quic/test_tools/quic_mock_syscall_wrapper.h"
+
+using testing::_;
+using testing::InSequence;
+using testing::Invoke;
+
+namespace quic {
+namespace test {
+namespace {
+
+class QuicLinuxSocketUtilsTest : public QuicTest {
+ protected:
+ WriteResult TestWriteMultiplePackets(
+ int fd,
+ const QuicCircularDeque<BufferedWrite>::const_iterator& first,
+ const QuicCircularDeque<BufferedWrite>::const_iterator& last,
+ int* num_packets_sent) {
+ QuicMMsgHdr mhdr(
+ first, last, kCmsgSpaceForIp,
+ [](QuicMMsgHdr* mhdr, int i, const BufferedWrite& buffered_write) {
+ mhdr->SetIpInNextCmsg(i, buffered_write.self_address);
+ });
+
+ WriteResult res =
+ QuicLinuxSocketUtils::WriteMultiplePackets(fd, &mhdr, num_packets_sent);
+ return res;
+ }
+
+ MockQuicSyscallWrapper mock_syscalls_;
+ ScopedGlobalSyscallWrapperOverride syscall_override_{&mock_syscalls_};
+};
+
+void CheckIpAndTtlInCbuf(msghdr* hdr,
+ const void* cbuf,
+ const QuicIpAddress& self_addr,
+ int ttl) {
+ const bool is_ipv4 = self_addr.IsIPv4();
+ const size_t ip_cmsg_space = is_ipv4 ? kCmsgSpaceForIpv4 : kCmsgSpaceForIpv6;
+
+ EXPECT_EQ(cbuf, hdr->msg_control);
+ EXPECT_EQ(ip_cmsg_space + CMSG_SPACE(sizeof(uint16_t)), hdr->msg_controllen);
+
+ cmsghdr* cmsg = CMSG_FIRSTHDR(hdr);
+ EXPECT_EQ(cmsg->cmsg_len, is_ipv4 ? CMSG_LEN(sizeof(in_pktinfo))
+ : CMSG_LEN(sizeof(in6_pktinfo)));
+ EXPECT_EQ(cmsg->cmsg_level, is_ipv4 ? IPPROTO_IP : IPPROTO_IPV6);
+ EXPECT_EQ(cmsg->cmsg_type, is_ipv4 ? IP_PKTINFO : IPV6_PKTINFO);
+
+ const std::string& self_addr_str = self_addr.ToPackedString();
+ if (is_ipv4) {
+ in_pktinfo* pktinfo = reinterpret_cast<in_pktinfo*>(CMSG_DATA(cmsg));
+ EXPECT_EQ(0, memcmp(&pktinfo->ipi_spec_dst, self_addr_str.c_str(),
+ self_addr_str.length()));
+ } else {
+ in6_pktinfo* pktinfo = reinterpret_cast<in6_pktinfo*>(CMSG_DATA(cmsg));
+ EXPECT_EQ(0, memcmp(&pktinfo->ipi6_addr, self_addr_str.c_str(),
+ self_addr_str.length()));
+ }
+
+ cmsg = CMSG_NXTHDR(hdr, cmsg);
+ EXPECT_EQ(cmsg->cmsg_len, CMSG_LEN(sizeof(int)));
+ EXPECT_EQ(cmsg->cmsg_level, is_ipv4 ? IPPROTO_IP : IPPROTO_IPV6);
+ EXPECT_EQ(cmsg->cmsg_type, is_ipv4 ? IP_TTL : IPV6_HOPLIMIT);
+ EXPECT_EQ(ttl, *reinterpret_cast<int*>(CMSG_DATA(cmsg)));
+
+ EXPECT_EQ(nullptr, CMSG_NXTHDR(hdr, cmsg));
+}
+
+void CheckMsghdrWithoutCbuf(const msghdr* hdr,
+ const void* buffer,
+ size_t buf_len,
+ const QuicSocketAddress& peer_addr) {
+ EXPECT_EQ(
+ peer_addr.host().IsIPv4() ? sizeof(sockaddr_in) : sizeof(sockaddr_in6),
+ hdr->msg_namelen);
+ sockaddr_storage peer_generic_addr = peer_addr.generic_address();
+ EXPECT_EQ(0, memcmp(hdr->msg_name, &peer_generic_addr, hdr->msg_namelen));
+ EXPECT_EQ(1u, hdr->msg_iovlen);
+ EXPECT_EQ(buffer, hdr->msg_iov->iov_base);
+ EXPECT_EQ(buf_len, hdr->msg_iov->iov_len);
+ EXPECT_EQ(0, hdr->msg_flags);
+ EXPECT_EQ(nullptr, hdr->msg_control);
+ EXPECT_EQ(0u, hdr->msg_controllen);
+}
+
+void CheckIpAndGsoSizeInCbuf(msghdr* hdr,
+ const void* cbuf,
+ const QuicIpAddress& self_addr,
+ uint16_t gso_size) {
+ const bool is_ipv4 = self_addr.IsIPv4();
+ const size_t ip_cmsg_space = is_ipv4 ? kCmsgSpaceForIpv4 : kCmsgSpaceForIpv6;
+
+ EXPECT_EQ(cbuf, hdr->msg_control);
+ EXPECT_EQ(ip_cmsg_space + CMSG_SPACE(sizeof(uint16_t)), hdr->msg_controllen);
+
+ cmsghdr* cmsg = CMSG_FIRSTHDR(hdr);
+ EXPECT_EQ(cmsg->cmsg_len, is_ipv4 ? CMSG_LEN(sizeof(in_pktinfo))
+ : CMSG_LEN(sizeof(in6_pktinfo)));
+ EXPECT_EQ(cmsg->cmsg_level, is_ipv4 ? IPPROTO_IP : IPPROTO_IPV6);
+ EXPECT_EQ(cmsg->cmsg_type, is_ipv4 ? IP_PKTINFO : IPV6_PKTINFO);
+
+ const std::string& self_addr_str = self_addr.ToPackedString();
+ if (is_ipv4) {
+ in_pktinfo* pktinfo = reinterpret_cast<in_pktinfo*>(CMSG_DATA(cmsg));
+ EXPECT_EQ(0, memcmp(&pktinfo->ipi_spec_dst, self_addr_str.c_str(),
+ self_addr_str.length()));
+ } else {
+ in6_pktinfo* pktinfo = reinterpret_cast<in6_pktinfo*>(CMSG_DATA(cmsg));
+ EXPECT_EQ(0, memcmp(&pktinfo->ipi6_addr, self_addr_str.c_str(),
+ self_addr_str.length()));
+ }
+
+ cmsg = CMSG_NXTHDR(hdr, cmsg);
+ EXPECT_EQ(cmsg->cmsg_len, CMSG_LEN(sizeof(uint16_t)));
+ EXPECT_EQ(cmsg->cmsg_level, SOL_UDP);
+ EXPECT_EQ(cmsg->cmsg_type, UDP_SEGMENT);
+ EXPECT_EQ(gso_size, *reinterpret_cast<uint16_t*>(CMSG_DATA(cmsg)));
+
+ EXPECT_EQ(nullptr, CMSG_NXTHDR(hdr, cmsg));
+}
+
+TEST_F(QuicLinuxSocketUtilsTest, QuicMsgHdr) {
+ QuicSocketAddress peer_addr(QuicIpAddress::Loopback4(), 1234);
+ char packet_buf[1024];
+
+ QuicMsgHdr quic_hdr(packet_buf, sizeof(packet_buf), peer_addr, nullptr, 0);
+ CheckMsghdrWithoutCbuf(quic_hdr.hdr(), packet_buf, sizeof(packet_buf),
+ peer_addr);
+
+ for (bool is_ipv4 : {true, false}) {
+ QuicIpAddress self_addr =
+ is_ipv4 ? QuicIpAddress::Loopback4() : QuicIpAddress::Loopback6();
+ char cbuf[kCmsgSpaceForIp + kCmsgSpaceForTTL];
+ QuicMsgHdr quic_hdr(packet_buf, sizeof(packet_buf), peer_addr, cbuf,
+ sizeof(cbuf));
+ msghdr* hdr = const_cast<msghdr*>(quic_hdr.hdr());
+
+ EXPECT_EQ(nullptr, hdr->msg_control);
+ EXPECT_EQ(0u, hdr->msg_controllen);
+
+ quic_hdr.SetIpInNextCmsg(self_addr);
+ EXPECT_EQ(cbuf, hdr->msg_control);
+ const size_t ip_cmsg_space =
+ is_ipv4 ? kCmsgSpaceForIpv4 : kCmsgSpaceForIpv6;
+ EXPECT_EQ(ip_cmsg_space, hdr->msg_controllen);
+
+ if (is_ipv4) {
+ *quic_hdr.GetNextCmsgData<int>(IPPROTO_IP, IP_TTL) = 32;
+ } else {
+ *quic_hdr.GetNextCmsgData<int>(IPPROTO_IPV6, IPV6_HOPLIMIT) = 32;
+ }
+
+ CheckIpAndTtlInCbuf(hdr, cbuf, self_addr, 32);
+ }
+}
+
+TEST_F(QuicLinuxSocketUtilsTest, QuicMMsgHdr) {
+ QuicCircularDeque<BufferedWrite> buffered_writes;
+ char packet_buf1[1024];
+ char packet_buf2[512];
+ buffered_writes.emplace_back(
+ packet_buf1, sizeof(packet_buf1), QuicIpAddress::Loopback4(),
+ QuicSocketAddress(QuicIpAddress::Loopback4(), 4));
+ buffered_writes.emplace_back(
+ packet_buf2, sizeof(packet_buf2), QuicIpAddress::Loopback6(),
+ QuicSocketAddress(QuicIpAddress::Loopback6(), 6));
+
+ QuicMMsgHdr quic_mhdr_without_cbuf(buffered_writes.begin(),
+ buffered_writes.end(), 0, nullptr);
+ for (size_t i = 0; i < buffered_writes.size(); ++i) {
+ const BufferedWrite& bw = buffered_writes[i];
+ CheckMsghdrWithoutCbuf(&quic_mhdr_without_cbuf.mhdr()[i].msg_hdr, bw.buffer,
+ bw.buf_len, bw.peer_address);
+ }
+
+ QuicMMsgHdr quic_mhdr_with_cbuf(
+ buffered_writes.begin(), buffered_writes.end(),
+ kCmsgSpaceForIp + kCmsgSpaceForSegmentSize,
+ [](QuicMMsgHdr* mhdr, int i, const BufferedWrite& buffered_write) {
+ mhdr->SetIpInNextCmsg(i, buffered_write.self_address);
+ *mhdr->GetNextCmsgData<uint16_t>(i, SOL_UDP, UDP_SEGMENT) = 1300;
+ });
+ for (size_t i = 0; i < buffered_writes.size(); ++i) {
+ const BufferedWrite& bw = buffered_writes[i];
+ msghdr* hdr = &quic_mhdr_with_cbuf.mhdr()[i].msg_hdr;
+ CheckIpAndGsoSizeInCbuf(hdr, hdr->msg_control, bw.self_address, 1300);
+ }
+}
+
+TEST_F(QuicLinuxSocketUtilsTest, WriteMultiplePackets_NoPacketsToSend) {
+ int num_packets_sent;
+ QuicCircularDeque<BufferedWrite> buffered_writes;
+
+ EXPECT_CALL(mock_syscalls_, Sendmmsg(_, _, _, _)).Times(0);
+
+ EXPECT_EQ(WriteResult(WRITE_STATUS_ERROR, EINVAL),
+ TestWriteMultiplePackets(1, buffered_writes.begin(),
+ buffered_writes.end(), &num_packets_sent));
+}
+
+TEST_F(QuicLinuxSocketUtilsTest, WriteMultiplePackets_WriteBlocked) {
+ int num_packets_sent;
+ QuicCircularDeque<BufferedWrite> buffered_writes;
+ buffered_writes.emplace_back(nullptr, 0, QuicIpAddress(),
+ QuicSocketAddress(QuicIpAddress::Any4(), 0));
+
+ EXPECT_CALL(mock_syscalls_, Sendmmsg(_, _, _, _))
+ .WillOnce(Invoke([](int /*fd*/, mmsghdr* /*msgvec*/,
+ unsigned int /*vlen*/, int /*flags*/) {
+ errno = EWOULDBLOCK;
+ return -1;
+ }));
+
+ EXPECT_EQ(WriteResult(WRITE_STATUS_BLOCKED, EWOULDBLOCK),
+ TestWriteMultiplePackets(1, buffered_writes.begin(),
+ buffered_writes.end(), &num_packets_sent));
+ EXPECT_EQ(0, num_packets_sent);
+}
+
+TEST_F(QuicLinuxSocketUtilsTest, WriteMultiplePackets_WriteError) {
+ int num_packets_sent;
+ QuicCircularDeque<BufferedWrite> buffered_writes;
+ buffered_writes.emplace_back(nullptr, 0, QuicIpAddress(),
+ QuicSocketAddress(QuicIpAddress::Any4(), 0));
+
+ EXPECT_CALL(mock_syscalls_, Sendmmsg(_, _, _, _))
+ .WillOnce(Invoke([](int /*fd*/, mmsghdr* /*msgvec*/,
+ unsigned int /*vlen*/, int /*flags*/) {
+ errno = EPERM;
+ return -1;
+ }));
+
+ EXPECT_EQ(WriteResult(WRITE_STATUS_ERROR, EPERM),
+ TestWriteMultiplePackets(1, buffered_writes.begin(),
+ buffered_writes.end(), &num_packets_sent));
+ EXPECT_EQ(0, num_packets_sent);
+}
+
+TEST_F(QuicLinuxSocketUtilsTest, WriteMultiplePackets_WriteSuccess) {
+ int num_packets_sent;
+ QuicCircularDeque<BufferedWrite> buffered_writes;
+ const int kNumBufferedWrites = 10;
+ static_assert(kNumBufferedWrites < 256, "Must be less than 256");
+ std::vector<std::string> buffer_holder;
+ for (int i = 0; i < kNumBufferedWrites; ++i) {
+ size_t buf_len = (i + 1) * 2;
+ std::ostringstream buffer_ostream;
+ while (buffer_ostream.str().length() < buf_len) {
+ buffer_ostream << i;
+ }
+ buffer_holder.push_back(buffer_ostream.str().substr(0, buf_len - 1) + '$');
+
+ buffered_writes.emplace_back(buffer_holder.back().data(), buf_len,
+ QuicIpAddress(),
+ QuicSocketAddress(QuicIpAddress::Any4(), 0));
+
+ // Leave the first self_address uninitialized.
+ if (i != 0) {
+ ASSERT_TRUE(buffered_writes.back().self_address.FromString("127.0.0.1"));
+ }
+
+ std::ostringstream peer_ip_ostream;
+ QuicIpAddress peer_ip_address;
+ peer_ip_ostream << "127.0.1." << i + 1;
+ ASSERT_TRUE(peer_ip_address.FromString(peer_ip_ostream.str()));
+ buffered_writes.back().peer_address =
+ QuicSocketAddress(peer_ip_address, i + 1);
+ }
+
+ InSequence s;
+
+ for (int expected_num_packets_sent : {1, 2, 3, 10}) {
+ SCOPED_TRACE(testing::Message()
+ << "expected_num_packets_sent=" << expected_num_packets_sent);
+ EXPECT_CALL(mock_syscalls_, Sendmmsg(_, _, _, _))
+ .WillOnce(Invoke(
+ [&](int /*fd*/, mmsghdr* msgvec, unsigned int vlen, int /*flags*/) {
+ EXPECT_LE(static_cast<unsigned int>(expected_num_packets_sent),
+ vlen);
+ for (unsigned int i = 0; i < vlen; ++i) {
+ const BufferedWrite& buffered_write = buffered_writes[i];
+ const msghdr& hdr = msgvec[i].msg_hdr;
+ EXPECT_EQ(1u, hdr.msg_iovlen);
+ EXPECT_EQ(buffered_write.buffer, hdr.msg_iov->iov_base);
+ EXPECT_EQ(buffered_write.buf_len, hdr.msg_iov->iov_len);
+ sockaddr_storage expected_peer_address =
+ buffered_write.peer_address.generic_address();
+ EXPECT_EQ(0, memcmp(&expected_peer_address, hdr.msg_name,
+ sizeof(sockaddr_storage)));
+ EXPECT_EQ(buffered_write.self_address.IsInitialized(),
+ hdr.msg_control != nullptr);
+ }
+ return expected_num_packets_sent;
+ }))
+ .RetiresOnSaturation();
+
+ int expected_bytes_written = 0;
+ for (auto it = buffered_writes.cbegin();
+ it != buffered_writes.cbegin() + expected_num_packets_sent; ++it) {
+ expected_bytes_written += it->buf_len;
+ }
+
+ EXPECT_EQ(
+ WriteResult(WRITE_STATUS_OK, expected_bytes_written),
+ TestWriteMultiplePackets(1, buffered_writes.cbegin(),
+ buffered_writes.cend(), &num_packets_sent));
+ EXPECT_EQ(expected_num_packets_sent, num_packets_sent);
+ }
+}
+
+} // namespace
+} // namespace test
+} // namespace quic
diff --git a/quic/core/quic_syscall_wrapper.cc b/quic/core/quic_syscall_wrapper.cc
new file mode 100644
index 0000000..b2404c6
--- /dev/null
+++ b/quic/core/quic_syscall_wrapper.cc
@@ -0,0 +1,49 @@
+// Copyright (c) 2019 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#include "net/third_party/quiche/src/quic/core/quic_syscall_wrapper.h"
+
+#include <atomic>
+#include <cerrno>
+
+namespace quic {
+namespace {
+std::atomic<QuicSyscallWrapper*> global_syscall_wrapper(new QuicSyscallWrapper);
+} // namespace
+
+ssize_t QuicSyscallWrapper::Sendmsg(int sockfd, const msghdr* msg, int flags) {
+ return ::sendmsg(sockfd, msg, flags);
+}
+
+int QuicSyscallWrapper::Sendmmsg(int sockfd,
+ mmsghdr* msgvec,
+ unsigned int vlen,
+ int flags) {
+#if defined(__linux__) && !defined(__ANDROID__)
+ return ::sendmmsg(sockfd, msgvec, vlen, flags);
+#else
+ errno = ENOSYS;
+ return -1;
+#endif
+}
+
+QuicSyscallWrapper* GetGlobalSyscallWrapper() {
+ return global_syscall_wrapper.load();
+}
+
+void SetGlobalSyscallWrapper(QuicSyscallWrapper* wrapper) {
+ global_syscall_wrapper.store(wrapper);
+}
+
+ScopedGlobalSyscallWrapperOverride::ScopedGlobalSyscallWrapperOverride(
+ QuicSyscallWrapper* wrapper_in_scope)
+ : original_wrapper_(GetGlobalSyscallWrapper()) {
+ SetGlobalSyscallWrapper(wrapper_in_scope);
+}
+
+ScopedGlobalSyscallWrapperOverride::~ScopedGlobalSyscallWrapperOverride() {
+ SetGlobalSyscallWrapper(original_wrapper_);
+}
+
+} // namespace quic
diff --git a/quic/core/quic_syscall_wrapper.h b/quic/core/quic_syscall_wrapper.h
new file mode 100644
index 0000000..3a80ac6
--- /dev/null
+++ b/quic/core/quic_syscall_wrapper.h
@@ -0,0 +1,49 @@
+// Copyright (c) 2019 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#ifndef QUICHE_QUIC_PLATFORM_IMPL_QUIC_SYSCALL_WRAPPER_H_
+#define QUICHE_QUIC_PLATFORM_IMPL_QUIC_SYSCALL_WRAPPER_H_
+
+#include <sys/socket.h>
+#include <sys/types.h>
+
+#include "net/third_party/quiche/src/quic/platform/api/quic_export.h"
+
+struct mmsghdr;
+namespace quic {
+
+// QuicSyscallWrapper is a pass-through proxy to the real syscalls.
+class QUIC_EXPORT_PRIVATE QuicSyscallWrapper {
+ public:
+ virtual ~QuicSyscallWrapper() = default;
+
+ virtual ssize_t Sendmsg(int sockfd, const msghdr* msg, int flags);
+
+ virtual int Sendmmsg(int sockfd,
+ mmsghdr* msgvec,
+ unsigned int vlen,
+ int flags);
+};
+
+// A global instance of QuicSyscallWrapper, used by some socket util functions.
+QuicSyscallWrapper* GetGlobalSyscallWrapper();
+
+// Change the global QuicSyscallWrapper to |wrapper|, for testing.
+void SetGlobalSyscallWrapper(QuicSyscallWrapper* wrapper);
+
+// ScopedGlobalSyscallWrapperOverride changes the global QuicSyscallWrapper
+// during its lifetime, for testing.
+class QUIC_EXPORT_PRIVATE ScopedGlobalSyscallWrapperOverride {
+ public:
+ explicit ScopedGlobalSyscallWrapperOverride(
+ QuicSyscallWrapper* wrapper_in_scope);
+ ~ScopedGlobalSyscallWrapperOverride();
+
+ private:
+ QuicSyscallWrapper* original_wrapper_;
+};
+
+} // namespace quic
+
+#endif // QUICHE_QUIC_PLATFORM_IMPL_QUIC_SYSCALL_WRAPPER_H_
diff --git a/quic/test_tools/quic_mock_syscall_wrapper.cc b/quic/test_tools/quic_mock_syscall_wrapper.cc
new file mode 100644
index 0000000..3cce97e
--- /dev/null
+++ b/quic/test_tools/quic_mock_syscall_wrapper.cc
@@ -0,0 +1,22 @@
+// Copyright (c) 2019 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#include "net/third_party/quiche/src/quic/test_tools/quic_mock_syscall_wrapper.h"
+
+using testing::_;
+using testing::Invoke;
+
+namespace quic {
+namespace test {
+
+MockQuicSyscallWrapper::MockQuicSyscallWrapper(QuicSyscallWrapper* delegate) {
+ ON_CALL(*this, Sendmsg(_, _, _))
+ .WillByDefault(Invoke(delegate, &QuicSyscallWrapper::Sendmsg));
+
+ ON_CALL(*this, Sendmmsg(_, _, _, _))
+ .WillByDefault(Invoke(delegate, &QuicSyscallWrapper::Sendmmsg));
+}
+
+} // namespace test
+} // namespace quic
diff --git a/quic/test_tools/quic_mock_syscall_wrapper.h b/quic/test_tools/quic_mock_syscall_wrapper.h
new file mode 100644
index 0000000..8dac5ad
--- /dev/null
+++ b/quic/test_tools/quic_mock_syscall_wrapper.h
@@ -0,0 +1,37 @@
+// Copyright (c) 2019 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#ifndef QUICHE_QUIC_PLATFORM_IMPL_QUIC_MOCK_SYSCALL_WRAPPER_H_
+#define QUICHE_QUIC_PLATFORM_IMPL_QUIC_MOCK_SYSCALL_WRAPPER_H_
+
+#include "net/third_party/quiche/src/quic/core/quic_syscall_wrapper.h"
+#include "net/third_party/quiche/src/quic/platform/api/quic_test.h"
+
+namespace quic {
+namespace test {
+
+class MockQuicSyscallWrapper : public QuicSyscallWrapper {
+ public:
+ // Create a standard mock object.
+ MockQuicSyscallWrapper() = default;
+
+ // Create a 'mockable' object that delegates everything to |delegate| by
+ // default.
+ explicit MockQuicSyscallWrapper(QuicSyscallWrapper* delegate);
+
+ MOCK_METHOD(ssize_t,
+ Sendmsg,
+ (int sockfd, const msghdr*, int flags),
+ (override));
+
+ MOCK_METHOD(int,
+ Sendmmsg,
+ (int sockfd, mmsghdr*, unsigned int vlen, int flags),
+ (override));
+};
+
+} // namespace test
+} // namespace quic
+
+#endif // QUICHE_QUIC_PLATFORM_IMPL_QUIC_MOCK_SYSCALL_WRAPPER_H_