(n/a) open source quic gso batch writer. not protected. To chromium code merger: do not add these files to BUILD.gn PiperOrigin-RevId: 315328138 Change-Id: I4dc9d2a682659dfb8e04b8bdaa6c1423995f0ada
diff --git a/quic/core/batch_writer/quic_batch_writer_base.cc b/quic/core/batch_writer/quic_batch_writer_base.cc new file mode 100644 index 0000000..919f76b --- /dev/null +++ b/quic/core/batch_writer/quic_batch_writer_base.cc
@@ -0,0 +1,177 @@ +// Copyright (c) 2019 The Chromium Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +#include "net/third_party/quiche/src/quic/core/batch_writer/quic_batch_writer_base.h" +#include <cstdint> + +#include "net/third_party/quiche/src/quic/platform/api/quic_export.h" +#include "net/third_party/quiche/src/quic/platform/api/quic_flags.h" +#include "net/third_party/quiche/src/quic/platform/api/quic_ptr_util.h" + +namespace quic { + +QuicBatchWriterBase::QuicBatchWriterBase( + std::unique_ptr<QuicBatchWriterBuffer> batch_buffer) + : write_blocked_(false), batch_buffer_(std::move(batch_buffer)) {} + +WriteResult QuicBatchWriterBase::WritePacket( + const char* buffer, + size_t buf_len, + const QuicIpAddress& self_address, + const QuicSocketAddress& peer_address, + PerPacketOptions* options) { + const WriteResult result = + InternalWritePacket(buffer, buf_len, self_address, peer_address, options); + if (result.status == WRITE_STATUS_BLOCKED) { + write_blocked_ = true; + } + return result; +} + +uint64_t QuicBatchWriterBase::GetReleaseTime( + const PerPacketOptions* options) const { + DCHECK(SupportsReleaseTime()); + + if (options == nullptr) { + return 0; + } + + if ((options->release_time_delay.IsZero() || options->allow_burst) && + !buffered_writes().empty()) { + // Send as soon as possible, but no sooner than the last buffered packet. + return buffered_writes().back().release_time; + } + + // Send according to the release time delay. + return NowInNanosForReleaseTime() + + options->release_time_delay.ToMicroseconds() * 1000; +} + +WriteResult QuicBatchWriterBase::InternalWritePacket( + const char* buffer, + size_t buf_len, + const QuicIpAddress& self_address, + const QuicSocketAddress& peer_address, + PerPacketOptions* options) { + if (buf_len > kMaxOutgoingPacketSize) { + return WriteResult(WRITE_STATUS_MSG_TOO_BIG, EMSGSIZE); + } + + uint64_t release_time = SupportsReleaseTime() ? GetReleaseTime(options) : 0; + + const CanBatchResult can_batch_result = CanBatch( + buffer, buf_len, self_address, peer_address, options, release_time); + + bool buffered = false; + bool flush = can_batch_result.must_flush; + + if (can_batch_result.can_batch) { + QuicBatchWriterBuffer::PushResult push_result = + batch_buffer_->PushBufferedWrite(buffer, buf_len, self_address, + peer_address, options, release_time); + if (push_result.succeeded) { + buffered = true; + // If there's no space left after the packet is buffered, force a flush. + flush = flush || (batch_buffer_->GetNextWriteLocation() == nullptr); + } else { + // If there's no space without this packet, force a flush. + flush = true; + } + } + + if (!flush) { + return WriteResult(WRITE_STATUS_OK, 0); + } + + size_t num_buffered_packets = buffered_writes().size(); + const FlushImplResult flush_result = CheckedFlush(); + const WriteResult& result = flush_result.write_result; + QUIC_DVLOG(1) << "Internally flushed " << flush_result.num_packets_sent + << " out of " << num_buffered_packets + << " packets. WriteResult=" << result; + + if (result.status != WRITE_STATUS_OK) { + if (IsWriteBlockedStatus(result.status)) { + return WriteResult( + buffered ? WRITE_STATUS_BLOCKED_DATA_BUFFERED : WRITE_STATUS_BLOCKED, + result.error_code); + } + + // Drop all packets, including the one being written. + size_t dropped_packets = + buffered ? buffered_writes().size() : buffered_writes().size() + 1; + + batch_buffer().Clear(); + WriteResult result_with_dropped = result; + result_with_dropped.dropped_packets = + dropped_packets > std::numeric_limits<uint16_t>::max() + ? std::numeric_limits<uint16_t>::max() + : static_cast<uint16_t>(dropped_packets); + return result_with_dropped; + } + + if (!buffered) { + QuicBatchWriterBuffer::PushResult push_result = + batch_buffer_->PushBufferedWrite(buffer, buf_len, self_address, + peer_address, options, release_time); + buffered = push_result.succeeded; + + // Since buffered_writes has been emptied, this write must have been + // buffered successfully. + QUIC_BUG_IF(!buffered) << "Failed to push to an empty batch buffer." + << " self_addr:" << self_address.ToString() + << ", peer_addr:" << peer_address.ToString() + << ", buf_len:" << buf_len; + } + + return result; +} + +QuicBatchWriterBase::FlushImplResult QuicBatchWriterBase::CheckedFlush() { + if (buffered_writes().empty()) { + return FlushImplResult{WriteResult(WRITE_STATUS_OK, 0), + /*num_packets_sent=*/0, /*bytes_written=*/0}; + } + + const FlushImplResult flush_result = FlushImpl(); + + // Either flush_result.write_result.status is not WRITE_STATUS_OK, or it is + // WRITE_STATUS_OK and batch_buffer is empty. + DCHECK(flush_result.write_result.status != WRITE_STATUS_OK || + buffered_writes().empty()); + + // Flush should never return WRITE_STATUS_BLOCKED_DATA_BUFFERED. + DCHECK(flush_result.write_result.status != + WRITE_STATUS_BLOCKED_DATA_BUFFERED); + + return flush_result; +} + +WriteResult QuicBatchWriterBase::Flush() { + size_t num_buffered_packets = buffered_writes().size(); + FlushImplResult flush_result = CheckedFlush(); + QUIC_DVLOG(1) << "Externally flushed " << flush_result.num_packets_sent + << " out of " << num_buffered_packets + << " packets. WriteResult=" << flush_result.write_result; + + if (IsWriteError(flush_result.write_result.status)) { + if (buffered_writes().size() > std::numeric_limits<uint16_t>::max()) { + flush_result.write_result.dropped_packets = + std::numeric_limits<uint16_t>::max(); + } else { + flush_result.write_result.dropped_packets = + static_cast<uint16_t>(buffered_writes().size()); + } + // Treat all errors as non-retryable fatal errors. Drop all buffered packets + // to avoid sending them and getting the same error again. + batch_buffer().Clear(); + } + + if (flush_result.write_result.status == WRITE_STATUS_BLOCKED) { + write_blocked_ = true; + } + return flush_result.write_result; +} + +} // namespace quic
diff --git a/quic/core/batch_writer/quic_batch_writer_base.h b/quic/core/batch_writer/quic_batch_writer_base.h new file mode 100644 index 0000000..0213894 --- /dev/null +++ b/quic/core/batch_writer/quic_batch_writer_base.h
@@ -0,0 +1,149 @@ +// Copyright (c) 2019 The Chromium Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +#ifndef QUICHE_QUIC_PLATFORM_IMPL_BATCH_WRITER_QUIC_BATCH_WRITER_BASE_H_ +#define QUICHE_QUIC_PLATFORM_IMPL_BATCH_WRITER_QUIC_BATCH_WRITER_BASE_H_ + +#include <cstdint> +#include "net/third_party/quiche/src/quic/core/batch_writer/quic_batch_writer_buffer.h" +#include "net/third_party/quiche/src/quic/core/quic_packet_writer.h" +#include "net/third_party/quiche/src/quic/core/quic_types.h" +#include "net/third_party/quiche/src/quic/platform/api/quic_ip_address.h" +#include "net/third_party/quiche/src/quic/platform/api/quic_socket_address.h" + +namespace quic { + +// QuicBatchWriterBase implements logic common to all derived batch writers, +// including maintaining write blockage state and a skeleton implemention of +// WritePacket(). +// A derived batch writer must override the FlushImpl() function to send all +// buffered writes in a batch. It must also override the CanBatch() function +// to control whether/when a WritePacket() call should flush. +class QUIC_EXPORT_PRIVATE QuicBatchWriterBase : public QuicPacketWriter { + public: + explicit QuicBatchWriterBase( + std::unique_ptr<QuicBatchWriterBuffer> batch_buffer); + + // ATTENTION: If this write triggered a flush, and the flush failed, all + // buffered packets will be dropped to allow the next write to work. The + // number of dropped packets can be found in WriteResult.dropped_packets. + WriteResult WritePacket(const char* buffer, + size_t buf_len, + const QuicIpAddress& self_address, + const QuicSocketAddress& peer_address, + PerPacketOptions* options) override; + + bool IsWriteBlocked() const final { return write_blocked_; } + + void SetWritable() final { write_blocked_ = false; } + + QuicByteCount GetMaxPacketSize( + const QuicSocketAddress& peer_address) const final { + return kMaxOutgoingPacketSize; + } + + bool SupportsReleaseTime() const { return false; } + + bool IsBatchMode() const final { return true; } + + QuicPacketBuffer GetNextWriteLocation( + const QuicIpAddress& self_address, + const QuicSocketAddress& peer_address) final { + // No need to explicitly delete QuicBatchWriterBuffer. + return {batch_buffer_->GetNextWriteLocation(), nullptr}; + } + + WriteResult Flush() final; + + protected: + const QuicBatchWriterBuffer& batch_buffer() const { return *batch_buffer_; } + QuicBatchWriterBuffer& batch_buffer() { return *batch_buffer_; } + + const QuicCircularDeque<BufferedWrite>& buffered_writes() const { + return batch_buffer_->buffered_writes(); + } + + // Get the current time in nanos which is understood by the sending api for + // releasing packets in the future. + virtual uint64_t NowInNanosForReleaseTime() const { + DCHECK(false) << "Should not be called since release time is unsupported."; + return 0; + } + + // Given the release delay in |options| and the state of |batch_buffer_|, get + // the absolute release time. + uint64_t GetReleaseTime(const PerPacketOptions* options) const; + + struct QUIC_EXPORT_PRIVATE CanBatchResult { + CanBatchResult(bool can_batch, bool must_flush) + : can_batch(can_batch), must_flush(must_flush) {} + // Whether this write can be batched with existing buffered writes. + bool can_batch; + // If |can_batch|, whether the caller must flush after this packet is + // buffered. + // Always true if not |can_batch|. + bool must_flush; + }; + + // Given the existing buffered writes(in buffered_writes()), whether a new + // write(in the arguments) can be batched. + virtual CanBatchResult CanBatch(const char* buffer, + size_t buf_len, + const QuicIpAddress& self_address, + const QuicSocketAddress& peer_address, + const PerPacketOptions* options, + uint64_t release_time) const = 0; + + struct QUIC_EXPORT_PRIVATE FlushImplResult { + // The return value of the Flush() interface, which is: + // - WriteResult(WRITE_STATUS_OK, <bytes_flushed>) if all buffered writes + // were sent successfully. + // - WRITE_STATUS_BLOCKED or WRITE_STATUS_ERROR, if the batch write is + // blocked or returned an error while sending. If a portion of buffered + // writes were sent successfully, |FlushImplResult.num_packets_sent| and + // |FlushImplResult.bytes_written| contain the number of successfully sent + // packets and their total bytes. + WriteResult write_result; + int num_packets_sent; + // If write_result.status == WRITE_STATUS_OK, |bytes_written| will be equal + // to write_result.bytes_written. Otherwise |bytes_written| will be the + // number of bytes written before WRITE_BLOCK or WRITE_ERROR happened. + int bytes_written; + }; + + // Send all buffered writes(in buffered_writes()) in a batch. + // buffered_writes() is guaranteed to be non-empty when this function is + // called. + virtual FlushImplResult FlushImpl() = 0; + + private: + WriteResult InternalWritePacket(const char* buffer, + size_t buf_len, + const QuicIpAddress& self_address, + const QuicSocketAddress& peer_address, + PerPacketOptions* options); + + // Calls FlushImpl() and check its post condition. + FlushImplResult CheckedFlush(); + + bool write_blocked_; + std::unique_ptr<QuicBatchWriterBuffer> batch_buffer_; +}; + +// QuicUdpBatchWriter is a batch writer backed by a UDP socket. +class QUIC_EXPORT_PRIVATE QuicUdpBatchWriter : public QuicBatchWriterBase { + public: + QuicUdpBatchWriter(std::unique_ptr<QuicBatchWriterBuffer> batch_buffer, + int fd) + : QuicBatchWriterBase(std::move(batch_buffer)), fd_(fd) {} + + int fd() const { return fd_; } + + private: + const int fd_; +}; + +} // namespace quic + +#endif // QUICHE_QUIC_PLATFORM_IMPL_BATCH_WRITER_QUIC_BATCH_WRITER_BASE_H_
diff --git a/quic/core/batch_writer/quic_batch_writer_buffer.cc b/quic/core/batch_writer/quic_batch_writer_buffer.cc new file mode 100644 index 0000000..6226139 --- /dev/null +++ b/quic/core/batch_writer/quic_batch_writer_buffer.cc
@@ -0,0 +1,154 @@ +// Copyright (c) 2019 The Chromium Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +#include "net/third_party/quiche/src/quic/core/batch_writer/quic_batch_writer_buffer.h" + +#include <sstream> + +namespace quic { + +QuicBatchWriterBuffer::QuicBatchWriterBuffer() { + memset(buffer_, 0, sizeof(buffer_)); +} + +void QuicBatchWriterBuffer::Clear() { + buffered_writes_.clear(); +} + +std::string QuicBatchWriterBuffer::DebugString() const { + std::ostringstream os; + os << "{ buffer: " << static_cast<const void*>(buffer_) + << " buffer_end: " << static_cast<const void*>(buffer_end()) + << " buffered_writes_.size(): " << buffered_writes_.size() + << " next_write_loc: " << static_cast<const void*>(GetNextWriteLocation()) + << " SizeInUse: " << SizeInUse() << " }"; + return os.str(); +} + +bool QuicBatchWriterBuffer::Invariants() const { + // Buffers in buffered_writes_ should not overlap, and collectively they + // should cover a continuous prefix of buffer_. + const char* next_buffer = buffer_; + for (auto iter = buffered_writes_.begin(); iter != buffered_writes_.end(); + ++iter) { + if ((iter->buffer != next_buffer) || + (iter->buffer + iter->buf_len > buffer_end())) { + return false; + } + next_buffer += iter->buf_len; + } + + return (next_buffer - buffer_) == SizeInUse(); +} + +char* QuicBatchWriterBuffer::GetNextWriteLocation() const { + const char* next_loc = + buffered_writes_.empty() + ? buffer_ + : buffered_writes_.back().buffer + buffered_writes_.back().buf_len; + if (buffer_end() - next_loc < kMaxOutgoingPacketSize) { + return nullptr; + } + return const_cast<char*>(next_loc); +} + +QuicBatchWriterBuffer::PushResult QuicBatchWriterBuffer::PushBufferedWrite( + const char* buffer, + size_t buf_len, + const QuicIpAddress& self_address, + const QuicSocketAddress& peer_address, + const PerPacketOptions* options, + uint64_t release_time) { + DCHECK(Invariants()); + DCHECK_LE(buf_len, kMaxOutgoingPacketSize); + + PushResult result = {/*succeeded=*/false, /*buffer_copied=*/false}; + char* next_write_location = GetNextWriteLocation(); + if (next_write_location == nullptr) { + return result; + } + + if (buffer != next_write_location) { + if (IsExternalBuffer(buffer, buf_len)) { + memcpy(next_write_location, buffer, buf_len); + } else if (IsInternalBuffer(buffer, buf_len)) { + memmove(next_write_location, buffer, buf_len); + } else { + QUIC_BUG << "Buffer[" << static_cast<const void*>(buffer) << ", " + << static_cast<const void*>(buffer + buf_len) + << ") overlaps with internal buffer[" + << static_cast<const void*>(buffer_) << ", " + << static_cast<const void*>(buffer_end()) << ")"; + return result; + } + result.buffer_copied = true; + } else { + // In place push, do nothing. + } + buffered_writes_.emplace_back( + next_write_location, buf_len, self_address, peer_address, + options ? options->Clone() : std::unique_ptr<PerPacketOptions>(), + release_time); + + DCHECK(Invariants()); + + result.succeeded = true; + return result; +} + +void QuicBatchWriterBuffer::UndoLastPush() { + if (!buffered_writes_.empty()) { + buffered_writes_.pop_back(); + } +} + +QuicBatchWriterBuffer::PopResult QuicBatchWriterBuffer::PopBufferedWrite( + int32_t num_buffered_writes) { + DCHECK(Invariants()); + DCHECK_GE(num_buffered_writes, 0); + DCHECK_LE(num_buffered_writes, buffered_writes_.size()); + + PopResult result = {/*num_buffers_popped=*/0, + /*moved_remaining_buffers=*/false}; + + result.num_buffers_popped = std::max<int32_t>(num_buffered_writes, 0); + result.num_buffers_popped = + std::min<int32_t>(result.num_buffers_popped, buffered_writes_.size()); + buffered_writes_.pop_front_n(result.num_buffers_popped); + + if (!buffered_writes_.empty()) { + // If not all buffered writes are erased, the remaining ones will not cover + // a continuous prefix of buffer_. We'll fix it by moving the remaining + // buffers to the beginning of buffer_ and adjust the buffer pointers in all + // remaining buffered writes. + // This should happen very rarely, about once per write block. + result.moved_remaining_buffers = true; + const char* buffer_before_move = buffered_writes_.front().buffer; + size_t buffer_len_to_move = buffered_writes_.back().buffer + + buffered_writes_.back().buf_len - + buffer_before_move; + memmove(buffer_, buffer_before_move, buffer_len_to_move); + + size_t distance_to_move = buffer_before_move - buffer_; + for (BufferedWrite& buffered_write : buffered_writes_) { + buffered_write.buffer -= distance_to_move; + } + + DCHECK_EQ(buffer_, buffered_writes_.front().buffer); + } + DCHECK(Invariants()); + + return result; +} + +size_t QuicBatchWriterBuffer::SizeInUse() const { + if (buffered_writes_.empty()) { + return 0; + } + + return buffered_writes_.back().buffer + buffered_writes_.back().buf_len - + buffer_; +} + +} // namespace quic
diff --git a/quic/core/batch_writer/quic_batch_writer_buffer.h b/quic/core/batch_writer/quic_batch_writer_buffer.h new file mode 100644 index 0000000..a441ec3 --- /dev/null +++ b/quic/core/batch_writer/quic_batch_writer_buffer.h
@@ -0,0 +1,95 @@ +// Copyright (c) 2019 The Chromium Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +#ifndef QUICHE_QUIC_PLATFORM_IMPL_BATCH_WRITER_QUIC_BATCH_WRITER_BUFFER_H_ +#define QUICHE_QUIC_PLATFORM_IMPL_BATCH_WRITER_QUIC_BATCH_WRITER_BUFFER_H_ + +#include "net/third_party/quiche/src/quic/core/quic_circular_deque.h" +#include "net/third_party/quiche/src/quic/core/quic_linux_socket_utils.h" +#include "net/third_party/quiche/src/quic/core/quic_packet_writer.h" +#include "net/third_party/quiche/src/quic/platform/api/quic_aligned.h" +#include "net/third_party/quiche/src/quic/platform/api/quic_ip_address.h" +#include "net/third_party/quiche/src/quic/platform/api/quic_socket_address.h" + +namespace quic { + +// QuicBatchWriterBuffer manages an internal buffer to hold data from multiple +// packets. Packet data are placed continuously within the internal buffer such +// that they can be sent by a QuicGsoBatchWriter. +// This class can also be used by a QuicBatchWriter which uses sendmmsg, +// although it is not optimized for that use case. +class QUIC_EXPORT_PRIVATE QuicBatchWriterBuffer { + public: + QuicBatchWriterBuffer(); + + // Clear all buffered writes, but leave the internal buffer intact. + void Clear(); + + char* GetNextWriteLocation() const; + + // Push a buffered write to the back. + struct QUIC_EXPORT_PRIVATE PushResult { + bool succeeded; + // True in one of the following cases: + // 1) The packet buffer is external and copied to the internal buffer, or + // 2) The packet buffer is from the internal buffer and moved within it. + // This only happens if PopBufferedWrite is called in the middle of a + // in-place push. + // Only valid if |succeeded| is true. + bool buffer_copied; + }; + + PushResult PushBufferedWrite(const char* buffer, + size_t buf_len, + const QuicIpAddress& self_address, + const QuicSocketAddress& peer_address, + const PerPacketOptions* options, + uint64_t release_time); + + void UndoLastPush(); + + // Pop |num_buffered_writes| buffered writes from the front. + // |num_buffered_writes| will be capped to [0, buffered_writes().size()] + // before it is used. + struct QUIC_EXPORT_PRIVATE PopResult { + int32_t num_buffers_popped; + // True if after |num_buffers_popped| buffers are popped from front, the + // remaining buffers are moved to the beginning of the internal buffer. + // This should normally be false. + bool moved_remaining_buffers; + }; + PopResult PopBufferedWrite(int32_t num_buffered_writes); + + const QuicCircularDeque<BufferedWrite>& buffered_writes() const { + return buffered_writes_; + } + + bool IsExternalBuffer(const char* buffer, size_t buf_len) const { + return (buffer + buf_len) <= buffer_ || buffer >= buffer_end(); + } + bool IsInternalBuffer(const char* buffer, size_t buf_len) const { + return buffer >= buffer_ && (buffer + buf_len) <= buffer_end(); + } + + // Number of bytes used in |buffer_|. + // PushBufferedWrite() increases this; PopBufferedWrite decreases this. + size_t SizeInUse() const; + + // Rounded up from |kMaxGsoPacketSize|, which is the maximum allowed + // size of a GSO packet. + static const size_t kBufferSize = 64 * 1024; + + std::string DebugString() const; + + protected: + // Whether the invariants of the buffer are upheld. For debug & test only. + bool Invariants() const; + const char* const buffer_end() const { return buffer_ + sizeof(buffer_); } + QUIC_CACHELINE_ALIGNED char buffer_[kBufferSize]; + QuicCircularDeque<BufferedWrite> buffered_writes_; +}; + +} // namespace quic + +#endif // QUICHE_QUIC_PLATFORM_IMPL_BATCH_WRITER_QUIC_BATCH_WRITER_BUFFER_H_
diff --git a/quic/core/batch_writer/quic_batch_writer_buffer_test.cc b/quic/core/batch_writer/quic_batch_writer_buffer_test.cc new file mode 100644 index 0000000..4cdb747 --- /dev/null +++ b/quic/core/batch_writer/quic_batch_writer_buffer_test.cc
@@ -0,0 +1,281 @@ +// Copyright (c) 2019 The Chromium Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +#include "net/third_party/quiche/src/quic/core/batch_writer/quic_batch_writer_buffer.h" +#include <memory> +#include <string> + +#include "net/third_party/quiche/src/quic/core/quic_constants.h" +#include "net/third_party/quiche/src/quic/platform/api/quic_ip_address.h" +#include "net/third_party/quiche/src/quic/platform/api/quic_socket_address.h" +#include "net/third_party/quiche/src/quic/platform/api/quic_test.h" + +namespace quic { +namespace test { +namespace { + +class QUIC_EXPORT_PRIVATE TestQuicBatchWriterBuffer + : public QuicBatchWriterBuffer { + public: + using QuicBatchWriterBuffer::buffer_; + using QuicBatchWriterBuffer::buffered_writes_; +}; + +static const size_t kBatchBufferSize = QuicBatchWriterBuffer::kBufferSize; + +class QuicBatchWriterBufferTest : public QuicTest { + public: + QuicBatchWriterBufferTest() { SwitchToNewBuffer(); } + + void SwitchToNewBuffer() { + batch_buffer_ = std::make_unique<TestQuicBatchWriterBuffer>(); + } + + // Fill packet_buffer_ with kMaxOutgoingPacketSize bytes of |c|s. + char* FillPacketBuffer(char c) { + return FillPacketBuffer(c, packet_buffer_, kMaxOutgoingPacketSize); + } + + // Fill |packet_buffer| with kMaxOutgoingPacketSize bytes of |c|s. + char* FillPacketBuffer(char c, char* packet_buffer) { + return FillPacketBuffer(c, packet_buffer, kMaxOutgoingPacketSize); + } + + // Fill |packet_buffer| with |buf_len| bytes of |c|s. + char* FillPacketBuffer(char c, char* packet_buffer, size_t buf_len) { + memset(packet_buffer, c, buf_len); + return packet_buffer; + } + + void CheckBufferedWriteContent(int buffered_write_index, + char buffer_content, + size_t buf_len, + const QuicIpAddress& self_addr, + const QuicSocketAddress& peer_addr, + const PerPacketOptions* options) { + const BufferedWrite& buffered_write = + batch_buffer_->buffered_writes()[buffered_write_index]; + EXPECT_EQ(buf_len, buffered_write.buf_len); + for (size_t i = 0; i < buf_len; ++i) { + EXPECT_EQ(buffer_content, buffered_write.buffer[i]); + if (buffer_content != buffered_write.buffer[i]) { + break; + } + } + EXPECT_EQ(self_addr, buffered_write.self_address); + EXPECT_EQ(peer_addr, buffered_write.peer_address); + if (options == nullptr) { + EXPECT_EQ(nullptr, buffered_write.options); + } else { + EXPECT_EQ(options->release_time_delay, + buffered_write.options->release_time_delay); + } + } + + protected: + std::unique_ptr<TestQuicBatchWriterBuffer> batch_buffer_; + QuicIpAddress self_addr_; + QuicSocketAddress peer_addr_; + uint64_t release_time_ = 0; + char packet_buffer_[kMaxOutgoingPacketSize]; +}; + +class BufferSizeSequence { + public: + explicit BufferSizeSequence( + std::vector<std::pair<std::vector<size_t>, size_t>> stages) + : stages_(std::move(stages)), + total_buf_len_(0), + stage_index_(0), + sequence_index_(0) {} + + size_t Next() { + const std::vector<size_t>& seq = stages_[stage_index_].first; + size_t buf_len = seq[sequence_index_++ % seq.size()]; + total_buf_len_ += buf_len; + if (stages_[stage_index_].second <= total_buf_len_) { + stage_index_ = std::min(stage_index_ + 1, stages_.size() - 1); + } + return buf_len; + } + + private: + const std::vector<std::pair<std::vector<size_t>, size_t>> stages_; + size_t total_buf_len_; + size_t stage_index_; + size_t sequence_index_; +}; + +// Test in-place pushes. A in-place push is a push with a buffer address that is +// equal to the result of GetNextWriteLocation(). +TEST_F(QuicBatchWriterBufferTest, InPlacePushes) { + std::vector<BufferSizeSequence> buffer_size_sequences = { + // Push large writes until the buffer is near full, then switch to 1-byte + // writes. This covers the edge cases when detecting insufficient buffer. + BufferSizeSequence({{{1350}, kBatchBufferSize - 3000}, {{1}, 1e6}}), + // A sequence that looks real. + BufferSizeSequence({{{1, 39, 97, 150, 1350, 1350, 1350, 1350}, 1e6}}), + }; + + for (auto& buffer_size_sequence : buffer_size_sequences) { + SwitchToNewBuffer(); + int64_t num_push_failures = 0; + + while (batch_buffer_->SizeInUse() < kBatchBufferSize) { + size_t buf_len = buffer_size_sequence.Next(); + const bool has_enough_space = + (kBatchBufferSize - batch_buffer_->SizeInUse() >= + kMaxOutgoingPacketSize); + + char* buffer = batch_buffer_->GetNextWriteLocation(); + + if (has_enough_space) { + EXPECT_EQ(batch_buffer_->buffer_ + batch_buffer_->SizeInUse(), buffer); + } else { + EXPECT_EQ(nullptr, buffer); + } + + SCOPED_TRACE(testing::Message() + << "Before Push: buf_len=" << buf_len + << ", has_enough_space=" << has_enough_space + << ", batch_buffer=" << batch_buffer_->DebugString()); + + auto push_result = batch_buffer_->PushBufferedWrite( + buffer, buf_len, self_addr_, peer_addr_, nullptr, release_time_); + if (!push_result.succeeded) { + ++num_push_failures; + } + EXPECT_EQ(has_enough_space, push_result.succeeded); + EXPECT_FALSE(push_result.buffer_copied); + if (!has_enough_space) { + break; + } + } + // Expect one and only one failure from the final push operation. + EXPECT_EQ(1, num_push_failures); + } +} + +// Test some in-place pushes mixed with pushes with external buffers. +TEST_F(QuicBatchWriterBufferTest, MixedPushes) { + // First, a in-place push. + char* buffer = batch_buffer_->GetNextWriteLocation(); + auto push_result = batch_buffer_->PushBufferedWrite( + FillPacketBuffer('A', buffer), kDefaultMaxPacketSize, self_addr_, + peer_addr_, nullptr, release_time_); + EXPECT_TRUE(push_result.succeeded); + EXPECT_FALSE(push_result.buffer_copied); + CheckBufferedWriteContent(0, 'A', kDefaultMaxPacketSize, self_addr_, + peer_addr_, nullptr); + + // Then a push with external buffer. + push_result = batch_buffer_->PushBufferedWrite( + FillPacketBuffer('B'), kDefaultMaxPacketSize, self_addr_, peer_addr_, + nullptr, release_time_); + EXPECT_TRUE(push_result.succeeded); + EXPECT_TRUE(push_result.buffer_copied); + CheckBufferedWriteContent(1, 'B', kDefaultMaxPacketSize, self_addr_, + peer_addr_, nullptr); + + // Then another in-place push. + buffer = batch_buffer_->GetNextWriteLocation(); + push_result = batch_buffer_->PushBufferedWrite( + FillPacketBuffer('C', buffer), kDefaultMaxPacketSize, self_addr_, + peer_addr_, nullptr, release_time_); + EXPECT_TRUE(push_result.succeeded); + EXPECT_FALSE(push_result.buffer_copied); + CheckBufferedWriteContent(2, 'C', kDefaultMaxPacketSize, self_addr_, + peer_addr_, nullptr); + + // Then another push with external buffer. + push_result = batch_buffer_->PushBufferedWrite( + FillPacketBuffer('D'), kDefaultMaxPacketSize, self_addr_, peer_addr_, + nullptr, release_time_); + EXPECT_TRUE(push_result.succeeded); + EXPECT_TRUE(push_result.buffer_copied); + CheckBufferedWriteContent(3, 'D', kDefaultMaxPacketSize, self_addr_, + peer_addr_, nullptr); +} + +TEST_F(QuicBatchWriterBufferTest, PopAll) { + const int kNumBufferedWrites = 10; + for (int i = 0; i < kNumBufferedWrites; ++i) { + EXPECT_TRUE(batch_buffer_ + ->PushBufferedWrite(packet_buffer_, kDefaultMaxPacketSize, + self_addr_, peer_addr_, nullptr, + release_time_) + .succeeded); + } + EXPECT_EQ(kNumBufferedWrites, batch_buffer_->buffered_writes().size()); + + auto pop_result = batch_buffer_->PopBufferedWrite(kNumBufferedWrites); + EXPECT_EQ(0, batch_buffer_->buffered_writes().size()); + EXPECT_EQ(kNumBufferedWrites, pop_result.num_buffers_popped); + EXPECT_FALSE(pop_result.moved_remaining_buffers); +} + +TEST_F(QuicBatchWriterBufferTest, PopPartial) { + const int kNumBufferedWrites = 10; + for (int i = 0; i < kNumBufferedWrites; ++i) { + EXPECT_TRUE(batch_buffer_ + ->PushBufferedWrite(FillPacketBuffer('A' + i), + kDefaultMaxPacketSize - i, self_addr_, + peer_addr_, nullptr, release_time_) + .succeeded); + } + + for (int i = 0; + i < kNumBufferedWrites && !batch_buffer_->buffered_writes().empty(); + ++i) { + const size_t size_before_pop = batch_buffer_->buffered_writes().size(); + const size_t expect_size_after_pop = + size_before_pop < i ? 0 : size_before_pop - i; + batch_buffer_->PopBufferedWrite(i); + ASSERT_EQ(expect_size_after_pop, batch_buffer_->buffered_writes().size()); + const char first_write_content = + 'A' + kNumBufferedWrites - expect_size_after_pop; + const size_t first_write_len = + kDefaultMaxPacketSize - kNumBufferedWrites + expect_size_after_pop; + for (int j = 0; j < expect_size_after_pop; ++j) { + CheckBufferedWriteContent(j, first_write_content + j, first_write_len - j, + self_addr_, peer_addr_, nullptr); + } + } +} + +TEST_F(QuicBatchWriterBufferTest, InPlacePushWithPops) { + // First, a in-place push. + char* buffer = batch_buffer_->GetNextWriteLocation(); + const size_t first_packet_len = 2; + auto push_result = batch_buffer_->PushBufferedWrite( + FillPacketBuffer('A', buffer, first_packet_len), first_packet_len, + self_addr_, peer_addr_, nullptr, release_time_); + EXPECT_TRUE(push_result.succeeded); + EXPECT_FALSE(push_result.buffer_copied); + CheckBufferedWriteContent(0, 'A', first_packet_len, self_addr_, peer_addr_, + nullptr); + + // Simulate the case where the writer wants to do another in-place push, but + // can't do so because it can't be batched with the first buffer. + buffer = batch_buffer_->GetNextWriteLocation(); + const size_t second_packet_len = 1350; + + // Flush the first buffer. + auto pop_result = batch_buffer_->PopBufferedWrite(1); + EXPECT_EQ(1, pop_result.num_buffers_popped); + EXPECT_FALSE(pop_result.moved_remaining_buffers); + + // Now the second push. + push_result = batch_buffer_->PushBufferedWrite( + FillPacketBuffer('B', buffer, second_packet_len), second_packet_len, + self_addr_, peer_addr_, nullptr, release_time_); + EXPECT_TRUE(push_result.succeeded); + EXPECT_TRUE(push_result.buffer_copied); + CheckBufferedWriteContent(0, 'B', second_packet_len, self_addr_, peer_addr_, + nullptr); +} + +} // namespace +} // namespace test +} // namespace quic
diff --git a/quic/core/batch_writer/quic_batch_writer_test.cc b/quic/core/batch_writer/quic_batch_writer_test.cc new file mode 100644 index 0000000..583b248 --- /dev/null +++ b/quic/core/batch_writer/quic_batch_writer_test.cc
@@ -0,0 +1,78 @@ +// Copyright (c) 2019 The Chromium Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +#include "net/third_party/quiche/src/quic/core/batch_writer/quic_batch_writer_test.h" +#include "net/third_party/quiche/src/quic/core/batch_writer/quic_gso_batch_writer.h" +#include "net/third_party/quiche/src/quic/core/batch_writer/quic_sendmmsg_batch_writer.h" + +namespace quic { +namespace test { +namespace { + +class QuicGsoBatchWriterIOTestDelegate + : public QuicUdpBatchWriterIOTestDelegate { + public: + bool ShouldSkip(const QuicUdpBatchWriterIOTestParams& params) override { + QuicUdpSocketApi socket_api; + int fd = + socket_api.Create(params.address_family, + /*receive_buffer_size=*/kDefaultSocketReceiveBuffer, + /*send_buffer_size=*/kDefaultSocketReceiveBuffer); + if (fd < 0) { + QUIC_LOG(ERROR) << "CreateSocket() failed: " << strerror(errno); + return false; // Let the test fail rather than skip it. + } + const bool gso_not_supported = + QuicLinuxSocketUtils::GetUDPSegmentSize(fd) < 0; + socket_api.Destroy(fd); + + if (gso_not_supported) { + QUIC_LOG(WARNING) << "Test skipped since GSO is not supported."; + return true; + } + + QUIC_LOG(WARNING) << "OK: GSO is supported."; + return false; + } + + void ResetWriter(int fd) override { + writer_ = std::make_unique<QuicGsoBatchWriter>( + std::make_unique<QuicBatchWriterBuffer>(), fd); + } + + QuicUdpBatchWriter* GetWriter() override { return writer_.get(); } + + private: + std::unique_ptr<QuicGsoBatchWriter> writer_; +}; + +INSTANTIATE_TEST_SUITE_P( + QuicGsoBatchWriterTest, + QuicUdpBatchWriterIOTest, + testing::ValuesIn( + MakeQuicBatchWriterTestParams<QuicGsoBatchWriterIOTestDelegate>())); + +class QuicSendmmsgBatchWriterIOTestDelegate + : public QuicUdpBatchWriterIOTestDelegate { + public: + void ResetWriter(int fd) override { + writer_ = std::make_unique<QuicSendmmsgBatchWriter>( + std::make_unique<QuicBatchWriterBuffer>(), fd); + } + + QuicUdpBatchWriter* GetWriter() override { return writer_.get(); } + + private: + std::unique_ptr<QuicSendmmsgBatchWriter> writer_; +}; + +INSTANTIATE_TEST_SUITE_P( + QuicSendmmsgBatchWriterTest, + QuicUdpBatchWriterIOTest, + testing::ValuesIn(MakeQuicBatchWriterTestParams< + QuicSendmmsgBatchWriterIOTestDelegate>())); + +} // namespace +} // namespace test +} // namespace quic
diff --git a/quic/core/batch_writer/quic_batch_writer_test.h b/quic/core/batch_writer/quic_batch_writer_test.h new file mode 100644 index 0000000..b4f36b9 --- /dev/null +++ b/quic/core/batch_writer/quic_batch_writer_test.h
@@ -0,0 +1,284 @@ +// Copyright (c) 2019 The Chromium Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +#ifndef QUICHE_QUIC_PLATFORM_IMPL_BATCH_WRITER_QUIC_BATCH_WRITER_TEST_H_ +#define QUICHE_QUIC_PLATFORM_IMPL_BATCH_WRITER_QUIC_BATCH_WRITER_TEST_H_ + +#include <sys/socket.h> +#include <sys/types.h> + +#include <iostream> +#include <utility> + +#include "net/third_party/quiche/src/quic/core/batch_writer/quic_batch_writer_base.h" +#include "net/third_party/quiche/src/quic/core/quic_udp_socket.h" +#include "net/third_party/quiche/src/quic/platform/api/quic_test.h" + +namespace quic { +namespace test { + +static bool IsAddressFamilySupported(int address_family) { + static auto check_function = [](int address_family) { + int fd = socket(address_family, SOCK_STREAM, 0); + if (fd < 0) { + QUIC_LOG(ERROR) << "address_family not supported: " << address_family + << ", error: " << strerror(errno); + EXPECT_EQ(EAFNOSUPPORT, errno); + return false; + } + close(fd); + return true; + }; + + if (address_family == AF_INET) { + static const bool ipv4_supported = check_function(AF_INET); + return ipv4_supported; + } + + static const bool ipv6_supported = check_function(AF_INET6); + return ipv6_supported; +} + +static bool CreateSocket(int family, QuicSocketAddress* address, int* fd) { + if (family == AF_INET) { + *address = QuicSocketAddress(QuicIpAddress::Loopback4(), 0); + } else { + DCHECK_EQ(family, AF_INET6); + *address = QuicSocketAddress(QuicIpAddress::Loopback6(), 0); + } + + QuicUdpSocketApi socket_api; + *fd = socket_api.Create(family, + /*receive_buffer_size=*/kDefaultSocketReceiveBuffer, + /*send_buffer_size=*/kDefaultSocketReceiveBuffer); + if (*fd < 0) { + QUIC_LOG(ERROR) << "CreateSocket() failed: " << strerror(errno); + return false; + } + socket_api.EnableDroppedPacketCount(*fd); + + if (!socket_api.Bind(*fd, *address)) { + QUIC_LOG(ERROR) << "Bind failed: " << strerror(errno); + return false; + } + + if (address->FromSocket(*fd) != 0) { + QUIC_LOG(ERROR) << "Unable to get self address. Error: " + << strerror(errno); + return false; + } + return true; +} + +struct QuicUdpBatchWriterIOTestParams; +class QUIC_EXPORT_PRIVATE QuicUdpBatchWriterIOTestDelegate { + public: + virtual ~QuicUdpBatchWriterIOTestDelegate() {} + + virtual bool ShouldSkip(const QuicUdpBatchWriterIOTestParams& params) { + return false; + } + + virtual void ResetWriter(int fd) = 0; + + virtual QuicUdpBatchWriter* GetWriter() = 0; +}; + +struct QUIC_EXPORT_PRIVATE QuicUdpBatchWriterIOTestParams { + // Use shared_ptr because gtest makes copies of test params. + std::shared_ptr<QuicUdpBatchWriterIOTestDelegate> delegate; + int address_family; + int data_size; + int packet_size; + + QUIC_EXPORT_PRIVATE friend std::ostream& operator<<( + std::ostream& os, + const QuicUdpBatchWriterIOTestParams& p) { + os << "{ address_family: " << p.address_family + << " data_size: " << p.data_size << " packet_size: " << p.packet_size + << " }"; + return os; + } +}; + +template <class QuicUdpBatchWriterIOTestDelegateT> +static std::vector<QuicUdpBatchWriterIOTestParams> +MakeQuicBatchWriterTestParams() { + static_assert(std::is_base_of<QuicUdpBatchWriterIOTestDelegate, + QuicUdpBatchWriterIOTestDelegateT>::value, + "<QuicUdpBatchWriterIOTestDelegateT> needs to derive from " + "QuicUdpBatchWriterIOTestDelegate"); + + std::vector<QuicUdpBatchWriterIOTestParams> params; + for (int address_family : {AF_INET, AF_INET6}) { + for (int data_size : {1, 150, 1500, 15000, 64000, 512 * 1024}) { + for (int packet_size : {1, 50, 1350, 1452}) { + if (packet_size <= data_size && (data_size / packet_size < 2000)) { + params.push_back( + {std::make_unique<QuicUdpBatchWriterIOTestDelegateT>(), + address_family, data_size, packet_size}); + } + } + } + } + return params; +} + +// QuicUdpBatchWriterIOTest is a value parameterized test fixture that can be +// used by tests of derived classes of QuicUdpBatchWriter, to verify basic +// packet IO capabilities. +class QUIC_EXPORT_PRIVATE QuicUdpBatchWriterIOTest + : public QuicTestWithParam<QuicUdpBatchWriterIOTestParams> { + protected: + QuicUdpBatchWriterIOTest() + : address_family_(GetParam().address_family), + data_size_(GetParam().data_size), + packet_size_(GetParam().packet_size), + self_socket_(-1), + peer_socket_(-1) { + QUIC_LOG(INFO) << "QuicUdpBatchWriterIOTestParams: " << GetParam(); + EXPECT_TRUE(address_family_ == AF_INET || address_family_ == AF_INET6); + EXPECT_LE(packet_size_, data_size_); + EXPECT_LE(packet_size_, sizeof(packet_buffer_)); + } + + ~QuicUdpBatchWriterIOTest() override { + if (self_socket_ > 0) { + close(self_socket_); + } + if (peer_socket_ > 0) { + close(peer_socket_); + } + } + + // Whether this test should be skipped. A test is passed if skipped. + // A test can be skipped when e.g. it exercises a kernel feature that is not + // available on the system. + bool ShouldSkip() { + if (!IsAddressFamilySupported(address_family_)) { + QUIC_LOG(WARNING) + << "Test skipped since address_family is not supported."; + return true; + } + + return GetParam().delegate->ShouldSkip(GetParam()); + } + + // Initialize a test. + // To fail the test in Initialize, use ASSERT_xx macros. + void Initialize() { + ASSERT_TRUE(CreateSocket(address_family_, &self_address_, &self_socket_)); + ASSERT_TRUE(CreateSocket(address_family_, &peer_address_, &peer_socket_)); + + QUIC_DLOG(INFO) << "Self address: " << self_address_.ToString() << ", fd " + << self_socket_; + QUIC_DLOG(INFO) << "Peer address: " << peer_address_.ToString() << ", fd " + << peer_socket_; + GetParam().delegate->ResetWriter(self_socket_); + } + + QuicUdpBatchWriter* GetWriter() { return GetParam().delegate->GetWriter(); } + + void ValidateWrite() { + char this_packet_content = '\0'; + int this_packet_size; + int num_writes = 0; + int bytes_flushed = 0; + WriteResult result; + + for (int bytes_sent = 0; bytes_sent < data_size_; + bytes_sent += this_packet_size, ++this_packet_content) { + this_packet_size = std::min(packet_size_, data_size_ - bytes_sent); + memset(&packet_buffer_[0], this_packet_content, this_packet_size); + + result = GetWriter()->WritePacket(&packet_buffer_[0], this_packet_size, + self_address_.host(), peer_address_, + nullptr); + + ASSERT_EQ(WRITE_STATUS_OK, result.status) << strerror(result.error_code); + bytes_flushed += result.bytes_written; + ++num_writes; + + QUIC_DVLOG(1) << "[write #" << num_writes + << "] this_packet_size: " << this_packet_size + << ", total_bytes_sent: " << bytes_sent + this_packet_size + << ", bytes_flushed: " << bytes_flushed + << ", pkt content:" << std::hex << int(this_packet_content); + } + + result = GetWriter()->Flush(); + ASSERT_EQ(WRITE_STATUS_OK, result.status) << strerror(result.error_code); + bytes_flushed += result.bytes_written; + ASSERT_EQ(data_size_, bytes_flushed); + + QUIC_LOG(INFO) << "Sent " << data_size_ << " bytes in " << num_writes + << " writes."; + } + + void ValidateRead() { + char this_packet_content = '\0'; + int this_packet_size; + int packets_received = 0; + for (int bytes_received = 0; bytes_received < data_size_; + bytes_received += this_packet_size, ++this_packet_content) { + this_packet_size = std::min(packet_size_, data_size_ - bytes_received); + SCOPED_TRACE(testing::Message() + << "Before ReadPacket: bytes_received=" << bytes_received + << ", this_packet_size=" << this_packet_size); + + QuicUdpSocketApi::ReadPacketResult result; + result.packet_buffer = {&packet_buffer_[0], sizeof(packet_buffer_)}; + result.control_buffer = {&control_buffer_[0], sizeof(control_buffer_)}; + QuicUdpSocketApi().ReadPacket( + peer_socket_, + quic::BitMask64(QuicUdpPacketInfoBit::V4_SELF_IP, + QuicUdpPacketInfoBit::V6_SELF_IP, + QuicUdpPacketInfoBit::PEER_ADDRESS), + &result); + ASSERT_TRUE(result.ok); + ASSERT_TRUE( + result.packet_info.HasValue(QuicUdpPacketInfoBit::PEER_ADDRESS)); + QuicSocketAddress read_peer_address = result.packet_info.peer_address(); + QuicIpAddress read_self_address = read_peer_address.host().IsIPv6() + ? result.packet_info.self_v6_ip() + : result.packet_info.self_v4_ip(); + + EXPECT_EQ(read_self_address, peer_address_.host()); + EXPECT_EQ(read_peer_address, self_address_); + for (int i = 0; i < this_packet_size; ++i) { + EXPECT_EQ(this_packet_content, packet_buffer_[i]); + } + packets_received += this_packet_size; + } + + QUIC_LOG(INFO) << "Received " << data_size_ << " bytes in " + << packets_received << " packets."; + } + + QuicSocketAddress self_address_; + QuicSocketAddress peer_address_; + char packet_buffer_[1500]; + char control_buffer_[kDefaultUdpPacketControlBufferSize]; + int address_family_; + const int data_size_; + const int packet_size_; + int self_socket_; + int peer_socket_; +}; + +TEST_P(QuicUdpBatchWriterIOTest, WriteAndRead) { + if (ShouldSkip()) { + return; + } + + Initialize(); + + ValidateWrite(); + ValidateRead(); +} + +} // namespace test +} // namespace quic + +#endif // QUICHE_QUIC_PLATFORM_IMPL_BATCH_WRITER_QUIC_BATCH_WRITER_TEST_H_
diff --git a/quic/core/batch_writer/quic_gso_batch_writer.cc b/quic/core/batch_writer/quic_gso_batch_writer.cc new file mode 100644 index 0000000..56c0e10 --- /dev/null +++ b/quic/core/batch_writer/quic_gso_batch_writer.cc
@@ -0,0 +1,122 @@ +// Copyright (c) 2019 The Chromium Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +#include "net/third_party/quiche/src/quic/core/batch_writer/quic_gso_batch_writer.h" + +#include <time.h> +#include <ctime> + +#include "net/third_party/quiche/src/quic/core/quic_linux_socket_utils.h" + +namespace quic { + +QuicGsoBatchWriter::QuicGsoBatchWriter( + std::unique_ptr<QuicBatchWriterBuffer> batch_buffer, + int fd) + : QuicGsoBatchWriter(std::move(batch_buffer), fd, CLOCK_MONOTONIC) {} + +QuicGsoBatchWriter::QuicGsoBatchWriter( + std::unique_ptr<QuicBatchWriterBuffer> batch_buffer, + int fd, + clockid_t clockid_for_release_time) + : QuicUdpBatchWriter(std::move(batch_buffer), fd), + clockid_for_release_time_(clockid_for_release_time), + supports_release_time_( + GetQuicRestartFlag(quic_support_release_time_for_gso) && + QuicLinuxSocketUtils::EnableReleaseTime(fd, + clockid_for_release_time)) { + if (supports_release_time_) { + QUIC_RESTART_FLAG_COUNT(quic_support_release_time_for_gso); + QUIC_LOG_FIRST_N(INFO, 5) << "Release time is enabled."; + } else { + QUIC_LOG_FIRST_N(INFO, 5) << "Release time is not enabled."; + } +} + +QuicGsoBatchWriter::QuicGsoBatchWriter( + std::unique_ptr<QuicBatchWriterBuffer> batch_buffer, + int fd, + clockid_t clockid_for_release_time, + ReleaseTimeForceEnabler enabler) + : QuicUdpBatchWriter(std::move(batch_buffer), fd), + clockid_for_release_time_(clockid_for_release_time), + supports_release_time_(true) { + QUIC_DLOG(INFO) << "Release time forcefully enabled."; +} + +QuicGsoBatchWriter::CanBatchResult QuicGsoBatchWriter::CanBatch( + const char* buffer, + size_t buf_len, + const QuicIpAddress& self_address, + const QuicSocketAddress& peer_address, + const PerPacketOptions* options, + uint64_t release_time) const { + // If there is nothing buffered already, this write will be included in this + // batch. + if (buffered_writes().empty()) { + return CanBatchResult(/*can_batch=*/true, /*must_flush=*/false); + } + + // The new write can be batched if all of the following are true: + // [0] The total number of the GSO segments(one write=one segment, including + // the new write) must not exceed |max_segments|. + // [1] It has the same source and destination addresses as already buffered + // writes. + // [2] It won't cause this batch to exceed kMaxGsoPacketSize. + // [3] Already buffered writes all have the same length. + // [4] Length of already buffered writes must >= length of the new write. + // [5] The new packet has the same release time as buffered writes. + const BufferedWrite& first = buffered_writes().front(); + const BufferedWrite& last = buffered_writes().back(); + size_t max_segments = MaxSegments(first.buf_len); + bool can_batch = + buffered_writes().size() < max_segments && // [0] + last.self_address == self_address && // [1] + last.peer_address == peer_address && // [1] + batch_buffer().SizeInUse() + buf_len <= kMaxGsoPacketSize && // [2] + first.buf_len == last.buf_len && // [3] + first.buf_len >= buf_len && // [4] + (!SupportsReleaseTime() || first.release_time == release_time); // [5] + + // A flush is required if any of the following is true: + // [a] The new write can't be batched. + // [b] Length of the new write is different from the length of already + // buffered writes. + // [c] The total number of the GSO segments, including the new write, reaches + // |max_segments|. + bool must_flush = (!can_batch) || // [a] + (last.buf_len != buf_len) || // [b] + (buffered_writes().size() + 1 == max_segments); // [c] + return CanBatchResult(can_batch, must_flush); +} + +uint64_t QuicGsoBatchWriter::NowInNanosForReleaseTime() const { + struct timespec ts; + + if (clock_gettime(clockid_for_release_time_, &ts) != 0) { + return 0; + } + + return ts.tv_sec * (1000ULL * 1000 * 1000) + ts.tv_nsec; +} + +// static +void QuicGsoBatchWriter::BuildCmsg(QuicMsgHdr* hdr, + const QuicIpAddress& self_address, + uint16_t gso_size, + uint64_t release_time) { + hdr->SetIpInNextCmsg(self_address); + if (gso_size > 0) { + *hdr->GetNextCmsgData<uint16_t>(SOL_UDP, UDP_SEGMENT) = gso_size; + } + if (release_time != 0) { + *hdr->GetNextCmsgData<uint64_t>(SOL_SOCKET, SO_TXTIME) = release_time; + } +} + +QuicGsoBatchWriter::FlushImplResult QuicGsoBatchWriter::FlushImpl() { + return InternalFlushImpl<kCmsgSpace>(BuildCmsg); +} + +} // namespace quic
diff --git a/quic/core/batch_writer/quic_gso_batch_writer.h b/quic/core/batch_writer/quic_gso_batch_writer.h new file mode 100644 index 0000000..64c7e02 --- /dev/null +++ b/quic/core/batch_writer/quic_gso_batch_writer.h
@@ -0,0 +1,114 @@ +// Copyright (c) 2019 The Chromium Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +#ifndef QUICHE_QUIC_PLATFORM_IMPL_BATCH_WRITER_QUIC_GSO_BATCH_WRITER_H_ +#define QUICHE_QUIC_PLATFORM_IMPL_BATCH_WRITER_QUIC_GSO_BATCH_WRITER_H_ + +#include "net/third_party/quiche/src/quic/core/batch_writer/quic_batch_writer_base.h" + +namespace quic { + +// QuicGsoBatchWriter sends QUIC packets in batches, using UDP socket's generic +// segmentation offload(GSO) capability. +class QUIC_EXPORT_PRIVATE QuicGsoBatchWriter : public QuicUdpBatchWriter { + public: + QuicGsoBatchWriter(std::unique_ptr<QuicBatchWriterBuffer> batch_buffer, + int fd); + + // |clockid_for_release_time|: FQ qdisc requires CLOCK_MONOTONIC, EDF requires + // CLOCK_TAI. + QuicGsoBatchWriter(std::unique_ptr<QuicBatchWriterBuffer> batch_buffer, + int fd, + clockid_t clockid_for_release_time); + + bool SupportsReleaseTime() const final { return supports_release_time_; } + + CanBatchResult CanBatch(const char* buffer, + size_t buf_len, + const QuicIpAddress& self_address, + const QuicSocketAddress& peer_address, + const PerPacketOptions* options, + uint64_t release_time) const override; + + FlushImplResult FlushImpl() override; + + protected: + // Test only constructor to forcefully enable release time. + struct QUIC_EXPORT_PRIVATE ReleaseTimeForceEnabler {}; + QuicGsoBatchWriter(std::unique_ptr<QuicBatchWriterBuffer> batch_buffer, + int fd, + clockid_t clockid_for_release_time, + ReleaseTimeForceEnabler enabler); + + uint64_t NowInNanosForReleaseTime() const override; + + static size_t MaxSegments(size_t gso_size) { + // Max segments should be the min of UDP_MAX_SEGMENTS(64) and + // (((64KB - sizeof(ip hdr) - sizeof(udp hdr)) / MSS) + 1), in the typical + // case of IPv6 packets with 1500-byte MTU, the result is + // ((64KB - 40 - 8) / (1500 - 48)) + 1 = 46 + // However, due a kernel bug, the limit is much lower for tiny gso_sizes. + return gso_size <= 2 ? 16 : 45; + } + + static const int kCmsgSpace = + kCmsgSpaceForIp + kCmsgSpaceForSegmentSize + kCmsgSpaceForTxTime; + static void BuildCmsg(QuicMsgHdr* hdr, + const QuicIpAddress& self_address, + uint16_t gso_size, + uint64_t release_time); + + template <size_t CmsgSpace, typename CmsgBuilderT> + FlushImplResult InternalFlushImpl(CmsgBuilderT cmsg_builder) { + DCHECK(!IsWriteBlocked()); + DCHECK(!buffered_writes().empty()); + + FlushImplResult result = {WriteResult(WRITE_STATUS_OK, 0), + /*num_packets_sent=*/0, /*bytes_written=*/0}; + WriteResult& write_result = result.write_result; + + int total_bytes = batch_buffer().SizeInUse(); + const BufferedWrite& first = buffered_writes().front(); + char cbuf[CmsgSpace]; + QuicMsgHdr hdr(first.buffer, total_bytes, first.peer_address, cbuf, + sizeof(cbuf)); + + uint16_t gso_size = buffered_writes().size() > 1 ? first.buf_len : 0; + cmsg_builder(&hdr, first.self_address, gso_size, first.release_time); + + write_result = QuicLinuxSocketUtils::WritePacket(fd(), hdr); + QUIC_DVLOG(1) << "Write GSO packet result: " << write_result + << ", fd: " << fd() + << ", self_address: " << first.self_address.ToString() + << ", peer_address: " << first.peer_address.ToString() + << ", num_segments: " << buffered_writes().size() + << ", total_bytes: " << total_bytes + << ", gso_size: " << gso_size; + + // All segments in a GSO packet share the same fate - if the write failed, + // none of them are sent, and it's not needed to call PopBufferedWrite(). + if (write_result.status != WRITE_STATUS_OK) { + return result; + } + + result.num_packets_sent = buffered_writes().size(); + + write_result.bytes_written = total_bytes; + result.bytes_written = total_bytes; + + batch_buffer().PopBufferedWrite(buffered_writes().size()); + + QUIC_BUG_IF(!buffered_writes().empty()) + << "All packets should have been written on a successful return"; + return result; + } + + private: + const clockid_t clockid_for_release_time_; + const bool supports_release_time_; +}; + +} // namespace quic + +#endif // QUICHE_QUIC_PLATFORM_IMPL_BATCH_WRITER_QUIC_GSO_BATCH_WRITER_H_
diff --git a/quic/core/batch_writer/quic_gso_batch_writer_test.cc b/quic/core/batch_writer/quic_gso_batch_writer_test.cc new file mode 100644 index 0000000..c7d695a --- /dev/null +++ b/quic/core/batch_writer/quic_gso_batch_writer_test.cc
@@ -0,0 +1,461 @@ +// Copyright (c) 2019 The Chromium Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +#include "net/third_party/quiche/src/quic/core/batch_writer/quic_gso_batch_writer.h" + +#include <cstdint> +#include <limits> +#include <memory> +#include <utility> + +#include "net/third_party/quiche/src/quic/platform/api/quic_ip_address.h" +#include "net/third_party/quiche/src/quic/platform/api/quic_test.h" +#include "net/third_party/quiche/src/quic/test_tools/quic_mock_syscall_wrapper.h" + +using testing::_; +using testing::Invoke; +using testing::StrictMock; + +namespace quic { +namespace test { +namespace { + +size_t PacketLength(const msghdr* msg) { + size_t length = 0; + for (size_t i = 0; i < msg->msg_iovlen; ++i) { + length += msg->msg_iov[i].iov_len; + } + return length; +} + +uint64_t MillisToNanos(uint64_t milliseconds) { + return milliseconds * 1000000; +} + +class QUIC_EXPORT_PRIVATE TestQuicGsoBatchWriter : public QuicGsoBatchWriter { + public: + using QuicGsoBatchWriter::batch_buffer; + using QuicGsoBatchWriter::buffered_writes; + using QuicGsoBatchWriter::CanBatch; + using QuicGsoBatchWriter::CanBatchResult; + using QuicGsoBatchWriter::GetReleaseTime; + using QuicGsoBatchWriter::MaxSegments; + using QuicGsoBatchWriter::QuicGsoBatchWriter; + + static std::unique_ptr<TestQuicGsoBatchWriter> + NewInstanceWithReleaseTimeSupport() { + return std::unique_ptr<TestQuicGsoBatchWriter>(new TestQuicGsoBatchWriter( + std::make_unique<QuicBatchWriterBuffer>(), + /*fd=*/-1, CLOCK_MONOTONIC, ReleaseTimeForceEnabler())); + } + + uint64_t NowInNanosForReleaseTime() const override { + return MillisToNanos(forced_release_time_ms_); + } + + void ForceReleaseTimeMs(uint64_t forced_release_time_ms) { + forced_release_time_ms_ = forced_release_time_ms; + } + + private: + uint64_t forced_release_time_ms_ = 1; +}; + +struct QUIC_EXPORT_PRIVATE TestPerPacketOptions : public PerPacketOptions { + std::unique_ptr<quic::PerPacketOptions> Clone() const override { + return std::make_unique<TestPerPacketOptions>(*this); + } +}; + +// TestBufferedWrite is a copy-constructible BufferedWrite. +struct QUIC_EXPORT_PRIVATE TestBufferedWrite : public BufferedWrite { + using BufferedWrite::BufferedWrite; + TestBufferedWrite(const TestBufferedWrite& other) + : BufferedWrite(other.buffer, + other.buf_len, + other.self_address, + other.peer_address, + other.options ? other.options->Clone() + : std::unique_ptr<PerPacketOptions>(), + other.release_time) {} +}; + +// Pointed to by all instances of |BatchCriteriaTestData|. Content not used. +static char unused_packet_buffer[kMaxOutgoingPacketSize]; + +struct QUIC_EXPORT_PRIVATE BatchCriteriaTestData { + BatchCriteriaTestData(size_t buf_len, + const QuicIpAddress& self_address, + const QuicSocketAddress& peer_address, + uint64_t release_time, + bool can_batch, + bool must_flush) + : buffered_write(unused_packet_buffer, + buf_len, + self_address, + peer_address, + std::unique_ptr<PerPacketOptions>(), + release_time), + can_batch(can_batch), + must_flush(must_flush) {} + + TestBufferedWrite buffered_write; + // Expected value of CanBatchResult.can_batch when batching |buffered_write|. + bool can_batch; + // Expected value of CanBatchResult.must_flush when batching |buffered_write|. + bool must_flush; +}; + +std::vector<BatchCriteriaTestData> BatchCriteriaTestData_SizeDecrease() { + const QuicIpAddress self_addr; + const QuicSocketAddress peer_addr; + std::vector<BatchCriteriaTestData> test_data_table = { + // clang-format off + // buf_len self_addr peer_addr t_rel can_batch must_flush + {1350, self_addr, peer_addr, 0, true, false}, + {1350, self_addr, peer_addr, 0, true, false}, + {1350, self_addr, peer_addr, 0, true, false}, + {39, self_addr, peer_addr, 0, true, true}, + {39, self_addr, peer_addr, 0, false, true}, + {1350, self_addr, peer_addr, 0, false, true}, + // clang-format on + }; + return test_data_table; +} + +std::vector<BatchCriteriaTestData> BatchCriteriaTestData_SizeIncrease() { + const QuicIpAddress self_addr; + const QuicSocketAddress peer_addr; + std::vector<BatchCriteriaTestData> test_data_table = { + // clang-format off + // buf_len self_addr peer_addr t_rel can_batch must_flush + {1350, self_addr, peer_addr, 0, true, false}, + {1350, self_addr, peer_addr, 0, true, false}, + {1350, self_addr, peer_addr, 0, true, false}, + {1351, self_addr, peer_addr, 0, false, true}, + // clang-format on + }; + return test_data_table; +} + +std::vector<BatchCriteriaTestData> BatchCriteriaTestData_AddressChange() { + const QuicIpAddress self_addr1 = QuicIpAddress::Loopback4(); + const QuicIpAddress self_addr2 = QuicIpAddress::Loopback6(); + const QuicSocketAddress peer_addr1(self_addr1, 666); + const QuicSocketAddress peer_addr2(self_addr1, 777); + const QuicSocketAddress peer_addr3(self_addr2, 666); + const QuicSocketAddress peer_addr4(self_addr2, 777); + std::vector<BatchCriteriaTestData> test_data_table = { + // clang-format off + // buf_len self_addr peer_addr t_rel can_batch must_flush + {1350, self_addr1, peer_addr1, 0, true, false}, + {1350, self_addr1, peer_addr1, 0, true, false}, + {1350, self_addr1, peer_addr1, 0, true, false}, + {1350, self_addr2, peer_addr1, 0, false, true}, + {1350, self_addr1, peer_addr2, 0, false, true}, + {1350, self_addr1, peer_addr3, 0, false, true}, + {1350, self_addr1, peer_addr4, 0, false, true}, + {1350, self_addr1, peer_addr4, 0, false, true}, + // clang-format on + }; + return test_data_table; +} + +std::vector<BatchCriteriaTestData> BatchCriteriaTestData_ReleaseTime1() { + const QuicIpAddress self_addr; + const QuicSocketAddress peer_addr; + std::vector<BatchCriteriaTestData> test_data_table = { + // clang-format off + // buf_len self_addr peer_addr t_rel can_batch must_flush + {1350, self_addr, peer_addr, 5, true, false}, + {1350, self_addr, peer_addr, 5, true, false}, + {1350, self_addr, peer_addr, 5, true, false}, + {1350, self_addr, peer_addr, 9, false, true}, + // clang-format on + }; + return test_data_table; +} + +std::vector<BatchCriteriaTestData> BatchCriteriaTestData_ReleaseTime2() { + const QuicIpAddress self_addr; + const QuicSocketAddress peer_addr; + std::vector<BatchCriteriaTestData> test_data_table = { + // clang-format off + // buf_len self_addr peer_addr t_rel can_batch must_flush + {1350, self_addr, peer_addr, 0, true, false}, + {1350, self_addr, peer_addr, 0, true, false}, + {1350, self_addr, peer_addr, 0, true, false}, + {1350, self_addr, peer_addr, 9, false, true}, + // clang-format on + }; + return test_data_table; +} + +std::vector<BatchCriteriaTestData> BatchCriteriaTestData_MaxSegments( + size_t gso_size) { + const QuicIpAddress self_addr; + const QuicSocketAddress peer_addr; + std::vector<BatchCriteriaTestData> test_data_table; + size_t max_segments = TestQuicGsoBatchWriter::MaxSegments(gso_size); + for (int i = 0; i < max_segments; ++i) { + bool is_last_in_batch = (i + 1 == max_segments); + test_data_table.push_back({gso_size, self_addr, peer_addr, + /*release_time=*/0, true, is_last_in_batch}); + } + test_data_table.push_back( + {gso_size, self_addr, peer_addr, /*release_time=*/0, false, true}); + return test_data_table; +} + +class QuicGsoBatchWriterTest : public QuicTest { + protected: + WriteResult WritePacket(QuicGsoBatchWriter* writer, size_t packet_size) { + return writer->WritePacket(&packet_buffer_[0], packet_size, self_address_, + peer_address_, nullptr); + } + + WriteResult WritePacketWithOptions(QuicGsoBatchWriter* writer, + PerPacketOptions* options) { + return writer->WritePacket(&packet_buffer_[0], 1350, self_address_, + peer_address_, options); + } + + QuicIpAddress self_address_ = QuicIpAddress::Any4(); + QuicSocketAddress peer_address_{QuicIpAddress::Any4(), 443}; + char packet_buffer_[1500]; + StrictMock<MockQuicSyscallWrapper> mock_syscalls_; + ScopedGlobalSyscallWrapperOverride syscall_override_{&mock_syscalls_}; +}; + +TEST_F(QuicGsoBatchWriterTest, BatchCriteria) { + std::unique_ptr<TestQuicGsoBatchWriter> writer; + + std::vector<std::vector<BatchCriteriaTestData>> test_data_tables; + test_data_tables.emplace_back(BatchCriteriaTestData_SizeDecrease()); + test_data_tables.emplace_back(BatchCriteriaTestData_SizeIncrease()); + test_data_tables.emplace_back(BatchCriteriaTestData_AddressChange()); + test_data_tables.emplace_back(BatchCriteriaTestData_ReleaseTime1()); + test_data_tables.emplace_back(BatchCriteriaTestData_ReleaseTime2()); + test_data_tables.emplace_back(BatchCriteriaTestData_MaxSegments(1)); + test_data_tables.emplace_back(BatchCriteriaTestData_MaxSegments(2)); + test_data_tables.emplace_back(BatchCriteriaTestData_MaxSegments(1350)); + + for (size_t i = 0; i < test_data_tables.size(); ++i) { + writer = TestQuicGsoBatchWriter::NewInstanceWithReleaseTimeSupport(); + + const auto& test_data_table = test_data_tables[i]; + for (size_t j = 0; j < test_data_table.size(); ++j) { + const BatchCriteriaTestData& test_data = test_data_table[j]; + SCOPED_TRACE(testing::Message() << "i=" << i << ", j=" << j); + TestQuicGsoBatchWriter::CanBatchResult result = writer->CanBatch( + test_data.buffered_write.buffer, test_data.buffered_write.buf_len, + test_data.buffered_write.self_address, + test_data.buffered_write.peer_address, + /*options=*/nullptr, test_data.buffered_write.release_time); + + ASSERT_EQ(test_data.can_batch, result.can_batch); + ASSERT_EQ(test_data.must_flush, result.must_flush); + + if (result.can_batch) { + ASSERT_TRUE( + writer->batch_buffer() + .PushBufferedWrite(test_data.buffered_write.buffer, + test_data.buffered_write.buf_len, + test_data.buffered_write.self_address, + test_data.buffered_write.peer_address, + /*options=*/nullptr, + test_data.buffered_write.release_time) + .succeeded); + } + } + } +} + +TEST_F(QuicGsoBatchWriterTest, WriteSuccess) { + TestQuicGsoBatchWriter writer(std::make_unique<QuicBatchWriterBuffer>(), + /*fd=*/-1); + + ASSERT_EQ(WriteResult(WRITE_STATUS_OK, 0), WritePacket(&writer, 1000)); + + EXPECT_CALL(mock_syscalls_, Sendmsg(_, _, _)) + .WillOnce(Invoke([](int sockfd, const msghdr* msg, int flags) { + EXPECT_EQ(1100u, PacketLength(msg)); + return 1100; + })); + ASSERT_EQ(WriteResult(WRITE_STATUS_OK, 1100), WritePacket(&writer, 100)); + ASSERT_EQ(0u, writer.batch_buffer().SizeInUse()); + ASSERT_EQ(0u, writer.buffered_writes().size()); +} + +TEST_F(QuicGsoBatchWriterTest, WriteBlockDataNotBuffered) { + TestQuicGsoBatchWriter writer(std::make_unique<QuicBatchWriterBuffer>(), + /*fd=*/-1); + + ASSERT_EQ(WriteResult(WRITE_STATUS_OK, 0), WritePacket(&writer, 100)); + ASSERT_EQ(WriteResult(WRITE_STATUS_OK, 0), WritePacket(&writer, 100)); + + EXPECT_CALL(mock_syscalls_, Sendmsg(_, _, _)) + .WillOnce(Invoke([](int sockfd, const msghdr* msg, int flags) { + EXPECT_EQ(200u, PacketLength(msg)); + errno = EWOULDBLOCK; + return -1; + })); + ASSERT_EQ(WriteResult(WRITE_STATUS_BLOCKED, EWOULDBLOCK), + WritePacket(&writer, 150)); + ASSERT_EQ(200u, writer.batch_buffer().SizeInUse()); + ASSERT_EQ(2u, writer.buffered_writes().size()); +} + +TEST_F(QuicGsoBatchWriterTest, WriteBlockDataBuffered) { + TestQuicGsoBatchWriter writer(std::make_unique<QuicBatchWriterBuffer>(), + /*fd=*/-1); + + ASSERT_EQ(WriteResult(WRITE_STATUS_OK, 0), WritePacket(&writer, 100)); + ASSERT_EQ(WriteResult(WRITE_STATUS_OK, 0), WritePacket(&writer, 100)); + + EXPECT_CALL(mock_syscalls_, Sendmsg(_, _, _)) + .WillOnce(Invoke([](int sockfd, const msghdr* msg, int flags) { + EXPECT_EQ(250u, PacketLength(msg)); + errno = EWOULDBLOCK; + return -1; + })); + ASSERT_EQ(WriteResult(WRITE_STATUS_BLOCKED_DATA_BUFFERED, EWOULDBLOCK), + WritePacket(&writer, 50)); + ASSERT_EQ(250u, writer.batch_buffer().SizeInUse()); + ASSERT_EQ(3u, writer.buffered_writes().size()); +} + +TEST_F(QuicGsoBatchWriterTest, WriteErrorWithoutDataBuffered) { + TestQuicGsoBatchWriter writer(std::make_unique<QuicBatchWriterBuffer>(), + /*fd=*/-1); + + ASSERT_EQ(WriteResult(WRITE_STATUS_OK, 0), WritePacket(&writer, 100)); + ASSERT_EQ(WriteResult(WRITE_STATUS_OK, 0), WritePacket(&writer, 100)); + + EXPECT_CALL(mock_syscalls_, Sendmsg(_, _, _)) + .WillOnce(Invoke([](int sockfd, const msghdr* msg, int flags) { + EXPECT_EQ(200u, PacketLength(msg)); + errno = EPERM; + return -1; + })); + WriteResult error_result = WritePacket(&writer, 150); + ASSERT_EQ(WriteResult(WRITE_STATUS_ERROR, EPERM), error_result); + + ASSERT_EQ(3u, error_result.dropped_packets); + ASSERT_EQ(0u, writer.batch_buffer().SizeInUse()); + ASSERT_EQ(0u, writer.buffered_writes().size()); +} + +TEST_F(QuicGsoBatchWriterTest, WriteErrorAfterDataBuffered) { + TestQuicGsoBatchWriter writer(std::make_unique<QuicBatchWriterBuffer>(), + /*fd=*/-1); + + ASSERT_EQ(WriteResult(WRITE_STATUS_OK, 0), WritePacket(&writer, 100)); + ASSERT_EQ(WriteResult(WRITE_STATUS_OK, 0), WritePacket(&writer, 100)); + + EXPECT_CALL(mock_syscalls_, Sendmsg(_, _, _)) + .WillOnce(Invoke([](int sockfd, const msghdr* msg, int flags) { + EXPECT_EQ(250u, PacketLength(msg)); + errno = EPERM; + return -1; + })); + WriteResult error_result = WritePacket(&writer, 50); + ASSERT_EQ(WriteResult(WRITE_STATUS_ERROR, EPERM), error_result); + + ASSERT_EQ(3u, error_result.dropped_packets); + ASSERT_EQ(0u, writer.batch_buffer().SizeInUse()); + ASSERT_EQ(0u, writer.buffered_writes().size()); +} + +TEST_F(QuicGsoBatchWriterTest, FlushError) { + TestQuicGsoBatchWriter writer(std::make_unique<QuicBatchWriterBuffer>(), + /*fd=*/-1); + + ASSERT_EQ(WriteResult(WRITE_STATUS_OK, 0), WritePacket(&writer, 100)); + ASSERT_EQ(WriteResult(WRITE_STATUS_OK, 0), WritePacket(&writer, 100)); + + EXPECT_CALL(mock_syscalls_, Sendmsg(_, _, _)) + .WillOnce(Invoke([](int sockfd, const msghdr* msg, int flags) { + EXPECT_EQ(200u, PacketLength(msg)); + errno = EINVAL; + return -1; + })); + WriteResult error_result = writer.Flush(); + ASSERT_EQ(WriteResult(WRITE_STATUS_ERROR, EINVAL), error_result); + + ASSERT_EQ(2u, error_result.dropped_packets); + ASSERT_EQ(0u, writer.batch_buffer().SizeInUse()); + ASSERT_EQ(0u, writer.buffered_writes().size()); +} + +TEST_F(QuicGsoBatchWriterTest, ReleaseTimeNullOptions) { + auto writer = TestQuicGsoBatchWriter::NewInstanceWithReleaseTimeSupport(); + EXPECT_EQ(0, writer->GetReleaseTime(nullptr)); +} + +TEST_F(QuicGsoBatchWriterTest, ReleaseTime) { + const WriteResult write_buffered(WRITE_STATUS_OK, 0); + + auto writer = TestQuicGsoBatchWriter::NewInstanceWithReleaseTimeSupport(); + + TestPerPacketOptions options; + EXPECT_TRUE(options.release_time_delay.IsZero()); + EXPECT_FALSE(options.allow_burst); + EXPECT_EQ(MillisToNanos(1), writer->GetReleaseTime(&options)); + + // The 1st packet has no delay. + ASSERT_EQ(write_buffered, WritePacketWithOptions(writer.get(), &options)); + EXPECT_EQ(MillisToNanos(1), writer->buffered_writes().back().release_time); + + // The 2nd packet has some delay, but allows burst. + options.release_time_delay = QuicTime::Delta::FromMilliseconds(3); + options.allow_burst = true; + ASSERT_EQ(write_buffered, WritePacketWithOptions(writer.get(), &options)); + EXPECT_EQ(MillisToNanos(1), writer->buffered_writes().back().release_time); + + // The 3rd packet has more delay and does not allow burst. + // The first 2 packets are flushed due to different release time. + EXPECT_CALL(mock_syscalls_, Sendmsg(_, _, _)) + .WillOnce(Invoke([](int sockfd, const msghdr* msg, int flags) { + EXPECT_EQ(2700u, PacketLength(msg)); + errno = 0; + return 0; + })); + options.release_time_delay = QuicTime::Delta::FromMilliseconds(5); + options.allow_burst = false; + ASSERT_EQ(WriteResult(WRITE_STATUS_OK, 2700), + WritePacketWithOptions(writer.get(), &options)); + EXPECT_EQ(MillisToNanos(6), writer->buffered_writes().back().release_time); + + // The 4th packet has same delay, but allows burst. + options.allow_burst = true; + ASSERT_EQ(write_buffered, WritePacketWithOptions(writer.get(), &options)); + EXPECT_EQ(MillisToNanos(6), writer->buffered_writes().back().release_time); + + // The 5th packet has same delay, allows burst, but is shorter. + // Packets 3,4 and 5 are flushed. + EXPECT_CALL(mock_syscalls_, Sendmsg(_, _, _)) + .WillOnce(Invoke([](int sockfd, const msghdr* msg, int flags) { + EXPECT_EQ(3000u, PacketLength(msg)); + errno = 0; + return 0; + })); + options.allow_burst = true; + EXPECT_EQ(MillisToNanos(6), writer->GetReleaseTime(&options)); + ASSERT_EQ(WriteResult(WRITE_STATUS_OK, 3000), + writer->WritePacket(&packet_buffer_[0], 300, self_address_, + peer_address_, &options)); + EXPECT_TRUE(writer->buffered_writes().empty()); + + // Pretend 1ms has elapsed and the 6th packet has 1ms less delay. In other + // words, the release time should still be the same as packets 3-5. + writer->ForceReleaseTimeMs(2); + options.release_time_delay = QuicTime::Delta::FromMilliseconds(4); + ASSERT_EQ(write_buffered, WritePacketWithOptions(writer.get(), &options)); + EXPECT_EQ(MillisToNanos(6), writer->buffered_writes().back().release_time); +} + +} // namespace +} // namespace test +} // namespace quic
diff --git a/quic/core/batch_writer/quic_sendmmsg_batch_writer.cc b/quic/core/batch_writer/quic_sendmmsg_batch_writer.cc new file mode 100644 index 0000000..efd2179 --- /dev/null +++ b/quic/core/batch_writer/quic_sendmmsg_batch_writer.cc
@@ -0,0 +1,83 @@ +// Copyright (c) 2019 The Chromium Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +#include "net/third_party/quiche/src/quic/core/batch_writer/quic_sendmmsg_batch_writer.h" + +namespace quic { + +QuicSendmmsgBatchWriter::QuicSendmmsgBatchWriter( + std::unique_ptr<QuicBatchWriterBuffer> batch_buffer, + int fd) + : QuicUdpBatchWriter(std::move(batch_buffer), fd) {} + +QuicSendmmsgBatchWriter::CanBatchResult QuicSendmmsgBatchWriter::CanBatch( + const char* buffer, + size_t buf_len, + const QuicIpAddress& self_address, + const QuicSocketAddress& peer_address, + const PerPacketOptions* options, + uint64_t release_time) const { + return CanBatchResult(/*can_batch=*/true, /*must_flush=*/false); +} + +QuicSendmmsgBatchWriter::FlushImplResult QuicSendmmsgBatchWriter::FlushImpl() { + return InternalFlushImpl( + kCmsgSpaceForIp, + [](QuicMMsgHdr* mhdr, int i, const BufferedWrite& buffered_write) { + mhdr->SetIpInNextCmsg(i, buffered_write.self_address); + }); +} + +QuicSendmmsgBatchWriter::FlushImplResult +QuicSendmmsgBatchWriter::InternalFlushImpl(size_t cmsg_space, + const CmsgBuilder& cmsg_builder) { + DCHECK(!IsWriteBlocked()); + DCHECK(!buffered_writes().empty()); + + FlushImplResult result = {WriteResult(WRITE_STATUS_OK, 0), + /*num_packets_sent=*/0, /*bytes_written=*/0}; + WriteResult& write_result = result.write_result; + + auto first = buffered_writes().cbegin(); + const auto last = buffered_writes().cend(); + while (first != last) { + QuicMMsgHdr mhdr(first, last, cmsg_space, cmsg_builder); + + int num_packets_sent; + write_result = QuicLinuxSocketUtils::WriteMultiplePackets( + fd(), &mhdr, &num_packets_sent); + QUIC_DVLOG(1) << "WriteMultiplePackets sent " << num_packets_sent + << " out of " << mhdr.num_msgs() + << " packets. WriteResult=" << write_result; + + if (write_result.status != WRITE_STATUS_OK) { + DCHECK_EQ(0, num_packets_sent); + break; + } else if (num_packets_sent == 0) { + QUIC_BUG << "WriteMultiplePackets returned OK, but no packets were sent."; + write_result = WriteResult(WRITE_STATUS_ERROR, EIO); + break; + } + + first += num_packets_sent; + + result.num_packets_sent += num_packets_sent; + result.bytes_written += write_result.bytes_written; + } + + // Call PopBufferedWrite() even if write_result.status is not WRITE_STATUS_OK, + // to deal with partial writes. + batch_buffer().PopBufferedWrite(result.num_packets_sent); + + if (write_result.status != WRITE_STATUS_OK) { + return result; + } + + QUIC_BUG_IF(!buffered_writes().empty()) + << "All packets should have been written on a successful return"; + write_result.bytes_written = result.bytes_written; + return result; +} + +} // namespace quic
diff --git a/quic/core/batch_writer/quic_sendmmsg_batch_writer.h b/quic/core/batch_writer/quic_sendmmsg_batch_writer.h new file mode 100644 index 0000000..26a728e --- /dev/null +++ b/quic/core/batch_writer/quic_sendmmsg_batch_writer.h
@@ -0,0 +1,35 @@ +// Copyright (c) 2019 The Chromium Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +#ifndef QUICHE_QUIC_PLATFORM_IMPL_QUIC_SENDMMSG_BATCH_WRITER_H_ +#define QUICHE_QUIC_PLATFORM_IMPL_QUIC_SENDMMSG_BATCH_WRITER_H_ + +#include "net/third_party/quiche/src/quic/core/batch_writer/quic_batch_writer_base.h" +#include "net/third_party/quiche/src/quic/core/quic_linux_socket_utils.h" + +namespace quic { + +class QUIC_EXPORT_PRIVATE QuicSendmmsgBatchWriter : public QuicUdpBatchWriter { + public: + QuicSendmmsgBatchWriter(std::unique_ptr<QuicBatchWriterBuffer> batch_buffer, + int fd); + + CanBatchResult CanBatch(const char* buffer, + size_t buf_len, + const QuicIpAddress& self_address, + const QuicSocketAddress& peer_address, + const PerPacketOptions* options, + uint64_t release_time) const override; + + FlushImplResult FlushImpl() override; + + protected: + typedef QuicMMsgHdr::ControlBufferInitializer CmsgBuilder; + FlushImplResult InternalFlushImpl(size_t cmsg_space, + const CmsgBuilder& cmsg_builder); +}; + +} // namespace quic + +#endif // QUICHE_QUIC_PLATFORM_IMPL_QUIC_SENDMMSG_BATCH_WRITER_H_
diff --git a/quic/core/batch_writer/quic_sendmmsg_batch_writer_test.cc b/quic/core/batch_writer/quic_sendmmsg_batch_writer_test.cc new file mode 100644 index 0000000..4dcc33c --- /dev/null +++ b/quic/core/batch_writer/quic_sendmmsg_batch_writer_test.cc
@@ -0,0 +1,15 @@ +// Copyright (c) 2019 The Chromium Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +#include "net/third_party/quiche/src/quic/core/batch_writer/quic_sendmmsg_batch_writer.h" + +namespace quic { +namespace test { +namespace { + +// Add tests here. + +} // namespace +} // namespace test +} // namespace quic
diff --git a/quic/core/quic_linux_socket_utils.cc b/quic/core/quic_linux_socket_utils.cc new file mode 100644 index 0000000..5100a92 --- /dev/null +++ b/quic/core/quic_linux_socket_utils.cc
@@ -0,0 +1,314 @@ +// Copyright (c) 2019 The Chromium Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +#include "net/third_party/quiche/src/quic/core/quic_linux_socket_utils.h" + +#include <linux/net_tstamp.h> +#include <netinet/in.h> +#include <cstdint> + +#include "net/third_party/quiche/src/quic/core/quic_syscall_wrapper.h" +#include "net/third_party/quiche/src/quic/platform/api/quic_ip_address.h" +#include "net/third_party/quiche/src/quic/platform/api/quic_logging.h" +#include "net/third_party/quiche/src/quic/platform/api/quic_socket_address.h" + +namespace quic { + +QuicMsgHdr::QuicMsgHdr(const char* buffer, + size_t buf_len, + const QuicSocketAddress& peer_address, + char* cbuf, + size_t cbuf_size) + : iov_{const_cast<char*>(buffer), buf_len}, + cbuf_(cbuf), + cbuf_size_(cbuf_size), + cmsg_(nullptr) { + // Only support unconnected sockets. + DCHECK(peer_address.IsInitialized()); + + raw_peer_address_ = peer_address.generic_address(); + hdr_.msg_name = &raw_peer_address_; + hdr_.msg_namelen = raw_peer_address_.ss_family == AF_INET + ? sizeof(sockaddr_in) + : sizeof(sockaddr_in6); + + hdr_.msg_iov = &iov_; + hdr_.msg_iovlen = 1; + hdr_.msg_flags = 0; + + hdr_.msg_control = nullptr; + hdr_.msg_controllen = 0; +} + +void QuicMsgHdr::SetIpInNextCmsg(const QuicIpAddress& self_address) { + if (!self_address.IsInitialized()) { + return; + } + + if (self_address.IsIPv4()) { + QuicLinuxSocketUtils::SetIpInfoInCmsgData( + self_address, GetNextCmsgData<in_pktinfo>(IPPROTO_IP, IP_PKTINFO)); + } else { + QuicLinuxSocketUtils::SetIpInfoInCmsgData( + self_address, GetNextCmsgData<in6_pktinfo>(IPPROTO_IPV6, IPV6_PKTINFO)); + } +} + +void* QuicMsgHdr::GetNextCmsgDataInternal(int cmsg_level, + int cmsg_type, + size_t data_size) { + // msg_controllen needs to be increased first, otherwise CMSG_NXTHDR will + // return nullptr. + hdr_.msg_controllen += CMSG_SPACE(data_size); + DCHECK_LE(hdr_.msg_controllen, cbuf_size_); + + if (cmsg_ == nullptr) { + DCHECK_EQ(nullptr, hdr_.msg_control); + memset(cbuf_, 0, cbuf_size_); + hdr_.msg_control = cbuf_; + cmsg_ = CMSG_FIRSTHDR(&hdr_); + } else { + DCHECK_NE(nullptr, hdr_.msg_control); + cmsg_ = CMSG_NXTHDR(&hdr_, cmsg_); + } + + DCHECK_NE(nullptr, cmsg_) << "Insufficient control buffer space"; + + cmsg_->cmsg_len = CMSG_LEN(data_size); + cmsg_->cmsg_level = cmsg_level; + cmsg_->cmsg_type = cmsg_type; + + return CMSG_DATA(cmsg_); +} + +void QuicMMsgHdr::InitOneHeader(int i, const BufferedWrite& buffered_write) { + mmsghdr* mhdr = GetMMsgHdr(i); + msghdr* hdr = &mhdr->msg_hdr; + iovec* iov = GetIov(i); + + iov->iov_base = const_cast<char*>(buffered_write.buffer); + iov->iov_len = buffered_write.buf_len; + hdr->msg_iov = iov; + hdr->msg_iovlen = 1; + hdr->msg_control = nullptr; + hdr->msg_controllen = 0; + + // Only support unconnected sockets. + DCHECK(buffered_write.peer_address.IsInitialized()); + + sockaddr_storage* peer_address_storage = GetPeerAddressStorage(i); + *peer_address_storage = buffered_write.peer_address.generic_address(); + hdr->msg_name = peer_address_storage; + hdr->msg_namelen = peer_address_storage->ss_family == AF_INET + ? sizeof(sockaddr_in) + : sizeof(sockaddr_in6); +} + +void QuicMMsgHdr::SetIpInNextCmsg(int i, const QuicIpAddress& self_address) { + if (!self_address.IsInitialized()) { + return; + } + + if (self_address.IsIPv4()) { + QuicLinuxSocketUtils::SetIpInfoInCmsgData( + self_address, GetNextCmsgData<in_pktinfo>(i, IPPROTO_IP, IP_PKTINFO)); + } else { + QuicLinuxSocketUtils::SetIpInfoInCmsgData( + self_address, + GetNextCmsgData<in6_pktinfo>(i, IPPROTO_IPV6, IPV6_PKTINFO)); + } +} + +void* QuicMMsgHdr::GetNextCmsgDataInternal(int i, + int cmsg_level, + int cmsg_type, + size_t data_size) { + mmsghdr* mhdr = GetMMsgHdr(i); + msghdr* hdr = &mhdr->msg_hdr; + cmsghdr*& cmsg = *GetCmsgHdr(i); + + // msg_controllen needs to be increased first, otherwise CMSG_NXTHDR will + // return nullptr. + hdr->msg_controllen += CMSG_SPACE(data_size); + DCHECK_LE(hdr->msg_controllen, cbuf_size_); + + if (cmsg == nullptr) { + DCHECK_EQ(nullptr, hdr->msg_control); + hdr->msg_control = GetCbuf(i); + cmsg = CMSG_FIRSTHDR(hdr); + } else { + DCHECK_NE(nullptr, hdr->msg_control); + cmsg = CMSG_NXTHDR(hdr, cmsg); + } + + DCHECK_NE(nullptr, cmsg) << "Insufficient control buffer space"; + + cmsg->cmsg_len = CMSG_LEN(data_size); + cmsg->cmsg_level = cmsg_level; + cmsg->cmsg_type = cmsg_type; + + return CMSG_DATA(cmsg); +} + +int QuicMMsgHdr::num_bytes_sent(int num_packets_sent) { + DCHECK_LE(0, num_packets_sent); + DCHECK_LE(num_packets_sent, num_msgs_); + + int bytes_sent = 0; + iovec* iov = GetIov(0); + for (int i = 0; i < num_packets_sent; ++i) { + bytes_sent += iov[i].iov_len; + } + return bytes_sent; +} + +// static +int QuicLinuxSocketUtils::GetUDPSegmentSize(int fd) { + int optval; + socklen_t optlen = sizeof(optval); + int rc = getsockopt(fd, SOL_UDP, UDP_SEGMENT, &optval, &optlen); + if (rc < 0) { + QUIC_LOG_EVERY_N_SEC(INFO, 10) + << "getsockopt(UDP_SEGMENT) failed: " << strerror(errno); + return -1; + } + QUIC_LOG_EVERY_N_SEC(INFO, 10) + << "getsockopt(UDP_SEGMENT) returned segment size: " << optval; + return optval; +} + +// static +bool QuicLinuxSocketUtils::EnableReleaseTime(int fd, clockid_t clockid) { + // TODO(wub): Change to sock_txtime once it is available in linux/net_tstamp.h + struct LinuxSockTxTime { + clockid_t clockid; /* reference clockid */ + uint32_t flags; /* flags defined by enum txtime_flags */ + }; + + LinuxSockTxTime so_txtime_val = {.clockid = clockid}; + + if (setsockopt(fd, SOL_SOCKET, SO_TXTIME, &so_txtime_val, + sizeof(so_txtime_val)) != 0) { + QUIC_LOG_EVERY_N_SEC(INFO, 10) + << "setsockopt(SOL_SOCKET,SO_TXTIME) failed: " << strerror(errno); + return false; + } + + return true; +} + +// static +bool QuicLinuxSocketUtils::GetTtlFromMsghdr(struct msghdr* hdr, int* ttl) { + if (hdr->msg_controllen > 0) { + struct cmsghdr* cmsg; + for (cmsg = CMSG_FIRSTHDR(hdr); cmsg != nullptr; + cmsg = CMSG_NXTHDR(hdr, cmsg)) { + if ((cmsg->cmsg_level == IPPROTO_IP && cmsg->cmsg_type == IP_TTL) || + (cmsg->cmsg_level == IPPROTO_IPV6 && + cmsg->cmsg_type == IPV6_HOPLIMIT)) { + *ttl = *(reinterpret_cast<int*>(CMSG_DATA(cmsg))); + return true; + } + } + } + return false; +} + +// static +void QuicLinuxSocketUtils::SetIpInfoInCmsgData( + const QuicIpAddress& self_address, + void* cmsg_data) { + DCHECK(self_address.IsInitialized()); + const std::string& address_str = self_address.ToPackedString(); + if (self_address.IsIPv4()) { + in_pktinfo* pktinfo = static_cast<in_pktinfo*>(cmsg_data); + pktinfo->ipi_ifindex = 0; + memcpy(&pktinfo->ipi_spec_dst, address_str.c_str(), address_str.length()); + } else if (self_address.IsIPv6()) { + in6_pktinfo* pktinfo = static_cast<in6_pktinfo*>(cmsg_data); + memcpy(&pktinfo->ipi6_addr, address_str.c_str(), address_str.length()); + } else { + QUIC_BUG << "Unrecognized IPAddress"; + } +} + +// static +size_t QuicLinuxSocketUtils::SetIpInfoInCmsg(const QuicIpAddress& self_address, + cmsghdr* cmsg) { + std::string address_string; + if (self_address.IsIPv4()) { + cmsg->cmsg_len = CMSG_LEN(sizeof(in_pktinfo)); + cmsg->cmsg_level = IPPROTO_IP; + cmsg->cmsg_type = IP_PKTINFO; + in_pktinfo* pktinfo = reinterpret_cast<in_pktinfo*>(CMSG_DATA(cmsg)); + memset(pktinfo, 0, sizeof(in_pktinfo)); + pktinfo->ipi_ifindex = 0; + address_string = self_address.ToPackedString(); + memcpy(&pktinfo->ipi_spec_dst, address_string.c_str(), + address_string.length()); + return sizeof(in_pktinfo); + } else if (self_address.IsIPv6()) { + cmsg->cmsg_len = CMSG_LEN(sizeof(in6_pktinfo)); + cmsg->cmsg_level = IPPROTO_IPV6; + cmsg->cmsg_type = IPV6_PKTINFO; + in6_pktinfo* pktinfo = reinterpret_cast<in6_pktinfo*>(CMSG_DATA(cmsg)); + memset(pktinfo, 0, sizeof(in6_pktinfo)); + address_string = self_address.ToPackedString(); + memcpy(&pktinfo->ipi6_addr, address_string.c_str(), + address_string.length()); + return sizeof(in6_pktinfo); + } else { + QUIC_BUG << "Unrecognized IPAddress"; + return 0; + } +} + +// static +WriteResult QuicLinuxSocketUtils::WritePacket(int fd, const QuicMsgHdr& hdr) { + int rc; + do { + rc = GetGlobalSyscallWrapper()->Sendmsg(fd, hdr.hdr(), 0); + } while (rc < 0 && errno == EINTR); + if (rc >= 0) { + return WriteResult(WRITE_STATUS_OK, rc); + } + return WriteResult((errno == EAGAIN || errno == EWOULDBLOCK) + ? WRITE_STATUS_BLOCKED + : WRITE_STATUS_ERROR, + errno); +} + +// static +WriteResult QuicLinuxSocketUtils::WriteMultiplePackets(int fd, + QuicMMsgHdr* mhdr, + int* num_packets_sent) { + *num_packets_sent = 0; + + if (mhdr->num_msgs() <= 0) { + return WriteResult(WRITE_STATUS_ERROR, EINVAL); + } + + int rc; + do { + rc = GetGlobalSyscallWrapper()->Sendmmsg(fd, mhdr->mhdr(), mhdr->num_msgs(), + 0); + } while (rc < 0 && errno == EINTR); + + if (rc > 0) { + *num_packets_sent = rc; + + return WriteResult(WRITE_STATUS_OK, mhdr->num_bytes_sent(rc)); + } else if (rc == 0) { + QUIC_BUG << "sendmmsg returned 0, returning WRITE_STATUS_ERROR. errno: " + << errno; + errno = EIO; + } + + return WriteResult((errno == EAGAIN || errno == EWOULDBLOCK) + ? WRITE_STATUS_BLOCKED + : WRITE_STATUS_ERROR, + errno); +} + +} // namespace quic
diff --git a/quic/core/quic_linux_socket_utils.h b/quic/core/quic_linux_socket_utils.h new file mode 100644 index 0000000..4091657 --- /dev/null +++ b/quic/core/quic_linux_socket_utils.h
@@ -0,0 +1,298 @@ +// Copyright (c) 2019 The Chromium Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +#ifndef QUICHE_QUIC_PLATFORM_IMPL_QUIC_LINUX_SOCKET_UTILS_H_ +#define QUICHE_QUIC_PLATFORM_IMPL_QUIC_LINUX_SOCKET_UTILS_H_ + +#include <errno.h> +#include <stddef.h> +#include <string.h> +#include <sys/socket.h> +#include <sys/uio.h> +#include <deque> +#include <functional> +#include <iterator> +#include <memory> +#include <type_traits> +#include <utility> + +#include "net/third_party/quiche/src/quic/core/quic_packet_writer.h" +#include "net/third_party/quiche/src/quic/core/quic_types.h" +#include "net/third_party/quiche/src/quic/platform/api/quic_bug_tracker.h" +#include "net/third_party/quiche/src/quic/platform/api/quic_ip_address.h" +#include "net/third_party/quiche/src/quic/platform/api/quic_logging.h" +#include "net/third_party/quiche/src/quic/platform/api/quic_socket_address.h" + +#ifndef SOL_UDP +#define SOL_UDP 17 +#endif + +#ifndef UDP_SEGMENT +#define UDP_SEGMENT 103 +#endif + +#ifndef UDP_MAX_SEGMENTS +#define UDP_MAX_SEGMENTS (1 << 6UL) +#endif + +#ifndef SO_TXTIME +#define SO_TXTIME 61 +#endif + +namespace quic { + +const int kCmsgSpaceForIpv4 = CMSG_SPACE(sizeof(in_pktinfo)); +const int kCmsgSpaceForIpv6 = CMSG_SPACE(sizeof(in6_pktinfo)); +// kCmsgSpaceForIp should be big enough to hold both IPv4 and IPv6 packet info. +const int kCmsgSpaceForIp = (kCmsgSpaceForIpv4 < kCmsgSpaceForIpv6) + ? kCmsgSpaceForIpv6 + : kCmsgSpaceForIpv4; + +const int kCmsgSpaceForSegmentSize = CMSG_SPACE(sizeof(uint16_t)); + +const int kCmsgSpaceForTxTime = CMSG_SPACE(sizeof(uint64_t)); + +const int kCmsgSpaceForTTL = CMSG_SPACE(sizeof(int)); + +// QuicMsgHdr is used to build msghdr objects that can be used send packets via +// ::sendmsg. +// +// Example: +// // cbuf holds control messages(cmsgs). The size is determined from what +// // cmsgs will be set for this msghdr. +// char cbuf[kCmsgSpaceForIp + kCmsgSpaceForSegmentSize]; +// QuicMsgHdr hdr(packet_buf, packet_buf_len, peer_addr, cbuf, sizeof(cbuf)); +// +// // Set IP in cmsgs. +// hdr.SetIpInNextCmsg(self_addr); +// +// // Set GSO size in cmsgs. +// *hdr.GetNextCmsgData<uint16_t>(SOL_UDP, UDP_SEGMENT) = 1200; +// +// QuicLinuxSocketUtils::WritePacket(fd, hdr); +class QUIC_EXPORT_PRIVATE QuicMsgHdr { + public: + QuicMsgHdr(const char* buffer, + size_t buf_len, + const QuicSocketAddress& peer_address, + char* cbuf, + size_t cbuf_size); + + // Set IP info in the next cmsg. Both IPv4 and IPv6 are supported. + void SetIpInNextCmsg(const QuicIpAddress& self_address); + + template <typename DataType> + DataType* GetNextCmsgData(int cmsg_level, int cmsg_type) { + return reinterpret_cast<DataType*>( + GetNextCmsgDataInternal(cmsg_level, cmsg_type, sizeof(DataType))); + } + + const msghdr* hdr() const { return &hdr_; } + + protected: + void* GetNextCmsgDataInternal(int cmsg_level, + int cmsg_type, + size_t data_size); + + msghdr hdr_; + iovec iov_; + sockaddr_storage raw_peer_address_; + char* cbuf_; + const size_t cbuf_size_; + // The last cmsg populated so far. nullptr means nothing has been populated. + cmsghdr* cmsg_; +}; + +// BufferedWrite holds all information needed to send a packet. +struct QUIC_EXPORT_PRIVATE BufferedWrite { + BufferedWrite(const char* buffer, + size_t buf_len, + const QuicIpAddress& self_address, + const QuicSocketAddress& peer_address) + : BufferedWrite(buffer, + buf_len, + self_address, + peer_address, + std::unique_ptr<PerPacketOptions>(), + /*release_time=*/0) {} + + BufferedWrite(const char* buffer, + size_t buf_len, + const QuicIpAddress& self_address, + const QuicSocketAddress& peer_address, + std::unique_ptr<PerPacketOptions> options, + uint64_t release_time) + : buffer(buffer), + buf_len(buf_len), + self_address(self_address), + peer_address(peer_address), + options(std::move(options)), + release_time(release_time) {} + + const char* buffer; // Not owned. + size_t buf_len; + QuicIpAddress self_address; + QuicSocketAddress peer_address; + std::unique_ptr<PerPacketOptions> options; + + // The release time according to the owning packet writer's clock, which is + // often not a QuicClock. Calculated from packet writer's Now() and the + // release time delay in |options|. + // 0 means it can be sent at the same time as the previous packet in a batch, + // or can be sent Now() if this is the first packet of a batch. + uint64_t release_time; +}; + +// QuicMMsgHdr is used to build mmsghdr objects that can be used to send +// multiple packets at once via ::sendmmsg. +// +// Example: +// QuicCircularDeque<BufferedWrite> buffered_writes; +// ... (Populate buffered_writes) ... +// +// QuicMMsgHdr mhdr( +// buffered_writes.begin(), buffered_writes.end(), kCmsgSpaceForIp, +// [](QuicMMsgHdr* mhdr, int i, const BufferedWrite& buffered_write) { +// mhdr->SetIpInNextCmsg(i, buffered_write.self_address); +// }); +// +// int num_packets_sent; +// QuicSocketUtils::WriteMultiplePackets(fd, &mhdr, &num_packets_sent); +class QUIC_EXPORT_PRIVATE QuicMMsgHdr { + public: + typedef std::function< + void(QuicMMsgHdr* mhdr, int i, const BufferedWrite& buffered_write)> + ControlBufferInitializer; + template <typename IteratorT> + QuicMMsgHdr(const IteratorT& first, + const IteratorT& last, + size_t cbuf_size, + ControlBufferInitializer cbuf_initializer) + : num_msgs_(std::distance(first, last)), cbuf_size_(cbuf_size) { + static_assert( + std::is_same<typename std::iterator_traits<IteratorT>::value_type, + BufferedWrite>::value, + "Must iterate over a collection of BufferedWrite."); + + DCHECK_LE(0, num_msgs_); + if (num_msgs_ == 0) { + return; + } + + storage_.reset(new char[StorageSize()]); + memset(&storage_[0], 0, StorageSize()); + + int i = -1; + for (auto it = first; it != last; ++it) { + ++i; + + InitOneHeader(i, *it); + if (cbuf_initializer) { + cbuf_initializer(this, i, *it); + } + } + } + + void SetIpInNextCmsg(int i, const QuicIpAddress& self_address); + + template <typename DataType> + DataType* GetNextCmsgData(int i, int cmsg_level, int cmsg_type) { + return reinterpret_cast<DataType*>( + GetNextCmsgDataInternal(i, cmsg_level, cmsg_type, sizeof(DataType))); + } + + mmsghdr* mhdr() { return GetMMsgHdr(0); } + + int num_msgs() const { return num_msgs_; } + + // Get the total number of bytes in the first |num_packets_sent| packets. + int num_bytes_sent(int num_packets_sent); + + protected: + void InitOneHeader(int i, const BufferedWrite& buffered_write); + + void* GetNextCmsgDataInternal(int i, + int cmsg_level, + int cmsg_type, + size_t data_size); + + size_t StorageSize() const { + return num_msgs_ * + (sizeof(mmsghdr) + sizeof(iovec) + sizeof(sockaddr_storage) + + sizeof(cmsghdr*) + cbuf_size_); + } + + mmsghdr* GetMMsgHdr(int i) { + auto* first = reinterpret_cast<mmsghdr*>(&storage_[0]); + return &first[i]; + } + + iovec* GetIov(int i) { + auto* first = reinterpret_cast<iovec*>(GetMMsgHdr(num_msgs_)); + return &first[i]; + } + + sockaddr_storage* GetPeerAddressStorage(int i) { + auto* first = reinterpret_cast<sockaddr_storage*>(GetIov(num_msgs_)); + return &first[i]; + } + + cmsghdr** GetCmsgHdr(int i) { + auto* first = reinterpret_cast<cmsghdr**>(GetPeerAddressStorage(num_msgs_)); + return &first[i]; + } + + char* GetCbuf(int i) { + auto* first = reinterpret_cast<char*>(GetCmsgHdr(num_msgs_)); + return &first[i * cbuf_size_]; + } + + const int num_msgs_; + // Size of cmsg buffer for each message. + const size_t cbuf_size_; + // storage_ holds the memory of + // |num_msgs_| mmsghdr + // |num_msgs_| iovec + // |num_msgs_| sockaddr_storage, for peer addresses + // |num_msgs_| cmsghdr* + // |num_msgs_| cbuf, each of size cbuf_size + std::unique_ptr<char[]> storage_; +}; + +class QUIC_EXPORT_PRIVATE QuicLinuxSocketUtils { + public: + // Return the UDP segment size of |fd|, 0 means segment size has not been set + // on this socket. If GSO is not supported, return -1. + static int GetUDPSegmentSize(int fd); + + // Enable release time on |fd|. + static bool EnableReleaseTime(int fd, clockid_t clockid); + + // If the msghdr contains an IP_TTL entry, this will set ttl to the correct + // value and return true. Otherwise it will return false. + static bool GetTtlFromMsghdr(struct msghdr* hdr, int* ttl); + + // Set IP(self_address) in |cmsg_data|. Does not touch other fields in the + // containing cmsghdr. + static void SetIpInfoInCmsgData(const QuicIpAddress& self_address, + void* cmsg_data); + + // A helper for WritePacket which fills in the cmsg with the supplied self + // address. + // Returns the length of the packet info structure used. + static size_t SetIpInfoInCmsg(const QuicIpAddress& self_address, + cmsghdr* cmsg); + + // Writes the packet in |hdr| to the socket, using ::sendmsg. + static WriteResult WritePacket(int fd, const QuicMsgHdr& hdr); + + // Writes the packets in |mhdr| to the socket, using ::sendmmsg if available. + static WriteResult WriteMultiplePackets(int fd, + QuicMMsgHdr* mhdr, + int* num_packets_sent); +}; + +} // namespace quic + +#endif // QUICHE_QUIC_PLATFORM_IMPL_QUIC_LINUX_SOCKET_UTILS_H_
diff --git a/quic/core/quic_linux_socket_utils_test.cc b/quic/core/quic_linux_socket_utils_test.cc new file mode 100644 index 0000000..32ad12f --- /dev/null +++ b/quic/core/quic_linux_socket_utils_test.cc
@@ -0,0 +1,329 @@ +// Copyright (c) 2019 The Chromium Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +#include "net/third_party/quiche/src/quic/core/quic_linux_socket_utils.h" + +#include <netinet/in.h> +#include <stdint.h> +#include <cstddef> +#include <sstream> +#include <vector> + +#include <string> + +#include "net/third_party/quiche/src/quic/core/quic_circular_deque.h" +#include "net/third_party/quiche/src/quic/platform/api/quic_test.h" +#include "net/third_party/quiche/src/quic/test_tools/quic_mock_syscall_wrapper.h" + +using testing::_; +using testing::InSequence; +using testing::Invoke; + +namespace quic { +namespace test { +namespace { + +class QuicLinuxSocketUtilsTest : public QuicTest { + protected: + WriteResult TestWriteMultiplePackets( + int fd, + const QuicCircularDeque<BufferedWrite>::const_iterator& first, + const QuicCircularDeque<BufferedWrite>::const_iterator& last, + int* num_packets_sent) { + QuicMMsgHdr mhdr( + first, last, kCmsgSpaceForIp, + [](QuicMMsgHdr* mhdr, int i, const BufferedWrite& buffered_write) { + mhdr->SetIpInNextCmsg(i, buffered_write.self_address); + }); + + WriteResult res = + QuicLinuxSocketUtils::WriteMultiplePackets(fd, &mhdr, num_packets_sent); + return res; + } + + MockQuicSyscallWrapper mock_syscalls_; + ScopedGlobalSyscallWrapperOverride syscall_override_{&mock_syscalls_}; +}; + +void CheckIpAndTtlInCbuf(msghdr* hdr, + const void* cbuf, + const QuicIpAddress& self_addr, + int ttl) { + const bool is_ipv4 = self_addr.IsIPv4(); + const size_t ip_cmsg_space = is_ipv4 ? kCmsgSpaceForIpv4 : kCmsgSpaceForIpv6; + + EXPECT_EQ(cbuf, hdr->msg_control); + EXPECT_EQ(ip_cmsg_space + CMSG_SPACE(sizeof(uint16_t)), hdr->msg_controllen); + + cmsghdr* cmsg = CMSG_FIRSTHDR(hdr); + EXPECT_EQ(cmsg->cmsg_len, is_ipv4 ? CMSG_LEN(sizeof(in_pktinfo)) + : CMSG_LEN(sizeof(in6_pktinfo))); + EXPECT_EQ(cmsg->cmsg_level, is_ipv4 ? IPPROTO_IP : IPPROTO_IPV6); + EXPECT_EQ(cmsg->cmsg_type, is_ipv4 ? IP_PKTINFO : IPV6_PKTINFO); + + const std::string& self_addr_str = self_addr.ToPackedString(); + if (is_ipv4) { + in_pktinfo* pktinfo = reinterpret_cast<in_pktinfo*>(CMSG_DATA(cmsg)); + EXPECT_EQ(0, memcmp(&pktinfo->ipi_spec_dst, self_addr_str.c_str(), + self_addr_str.length())); + } else { + in6_pktinfo* pktinfo = reinterpret_cast<in6_pktinfo*>(CMSG_DATA(cmsg)); + EXPECT_EQ(0, memcmp(&pktinfo->ipi6_addr, self_addr_str.c_str(), + self_addr_str.length())); + } + + cmsg = CMSG_NXTHDR(hdr, cmsg); + EXPECT_EQ(cmsg->cmsg_len, CMSG_LEN(sizeof(int))); + EXPECT_EQ(cmsg->cmsg_level, is_ipv4 ? IPPROTO_IP : IPPROTO_IPV6); + EXPECT_EQ(cmsg->cmsg_type, is_ipv4 ? IP_TTL : IPV6_HOPLIMIT); + EXPECT_EQ(ttl, *reinterpret_cast<int*>(CMSG_DATA(cmsg))); + + EXPECT_EQ(nullptr, CMSG_NXTHDR(hdr, cmsg)); +} + +void CheckMsghdrWithoutCbuf(const msghdr* hdr, + const void* buffer, + size_t buf_len, + const QuicSocketAddress& peer_addr) { + EXPECT_EQ( + peer_addr.host().IsIPv4() ? sizeof(sockaddr_in) : sizeof(sockaddr_in6), + hdr->msg_namelen); + sockaddr_storage peer_generic_addr = peer_addr.generic_address(); + EXPECT_EQ(0, memcmp(hdr->msg_name, &peer_generic_addr, hdr->msg_namelen)); + EXPECT_EQ(1u, hdr->msg_iovlen); + EXPECT_EQ(buffer, hdr->msg_iov->iov_base); + EXPECT_EQ(buf_len, hdr->msg_iov->iov_len); + EXPECT_EQ(0, hdr->msg_flags); + EXPECT_EQ(nullptr, hdr->msg_control); + EXPECT_EQ(0u, hdr->msg_controllen); +} + +void CheckIpAndGsoSizeInCbuf(msghdr* hdr, + const void* cbuf, + const QuicIpAddress& self_addr, + uint16_t gso_size) { + const bool is_ipv4 = self_addr.IsIPv4(); + const size_t ip_cmsg_space = is_ipv4 ? kCmsgSpaceForIpv4 : kCmsgSpaceForIpv6; + + EXPECT_EQ(cbuf, hdr->msg_control); + EXPECT_EQ(ip_cmsg_space + CMSG_SPACE(sizeof(uint16_t)), hdr->msg_controllen); + + cmsghdr* cmsg = CMSG_FIRSTHDR(hdr); + EXPECT_EQ(cmsg->cmsg_len, is_ipv4 ? CMSG_LEN(sizeof(in_pktinfo)) + : CMSG_LEN(sizeof(in6_pktinfo))); + EXPECT_EQ(cmsg->cmsg_level, is_ipv4 ? IPPROTO_IP : IPPROTO_IPV6); + EXPECT_EQ(cmsg->cmsg_type, is_ipv4 ? IP_PKTINFO : IPV6_PKTINFO); + + const std::string& self_addr_str = self_addr.ToPackedString(); + if (is_ipv4) { + in_pktinfo* pktinfo = reinterpret_cast<in_pktinfo*>(CMSG_DATA(cmsg)); + EXPECT_EQ(0, memcmp(&pktinfo->ipi_spec_dst, self_addr_str.c_str(), + self_addr_str.length())); + } else { + in6_pktinfo* pktinfo = reinterpret_cast<in6_pktinfo*>(CMSG_DATA(cmsg)); + EXPECT_EQ(0, memcmp(&pktinfo->ipi6_addr, self_addr_str.c_str(), + self_addr_str.length())); + } + + cmsg = CMSG_NXTHDR(hdr, cmsg); + EXPECT_EQ(cmsg->cmsg_len, CMSG_LEN(sizeof(uint16_t))); + EXPECT_EQ(cmsg->cmsg_level, SOL_UDP); + EXPECT_EQ(cmsg->cmsg_type, UDP_SEGMENT); + EXPECT_EQ(gso_size, *reinterpret_cast<uint16_t*>(CMSG_DATA(cmsg))); + + EXPECT_EQ(nullptr, CMSG_NXTHDR(hdr, cmsg)); +} + +TEST_F(QuicLinuxSocketUtilsTest, QuicMsgHdr) { + QuicSocketAddress peer_addr(QuicIpAddress::Loopback4(), 1234); + char packet_buf[1024]; + + QuicMsgHdr quic_hdr(packet_buf, sizeof(packet_buf), peer_addr, nullptr, 0); + CheckMsghdrWithoutCbuf(quic_hdr.hdr(), packet_buf, sizeof(packet_buf), + peer_addr); + + for (bool is_ipv4 : {true, false}) { + QuicIpAddress self_addr = + is_ipv4 ? QuicIpAddress::Loopback4() : QuicIpAddress::Loopback6(); + char cbuf[kCmsgSpaceForIp + kCmsgSpaceForTTL]; + QuicMsgHdr quic_hdr(packet_buf, sizeof(packet_buf), peer_addr, cbuf, + sizeof(cbuf)); + msghdr* hdr = const_cast<msghdr*>(quic_hdr.hdr()); + + EXPECT_EQ(nullptr, hdr->msg_control); + EXPECT_EQ(0u, hdr->msg_controllen); + + quic_hdr.SetIpInNextCmsg(self_addr); + EXPECT_EQ(cbuf, hdr->msg_control); + const size_t ip_cmsg_space = + is_ipv4 ? kCmsgSpaceForIpv4 : kCmsgSpaceForIpv6; + EXPECT_EQ(ip_cmsg_space, hdr->msg_controllen); + + if (is_ipv4) { + *quic_hdr.GetNextCmsgData<int>(IPPROTO_IP, IP_TTL) = 32; + } else { + *quic_hdr.GetNextCmsgData<int>(IPPROTO_IPV6, IPV6_HOPLIMIT) = 32; + } + + CheckIpAndTtlInCbuf(hdr, cbuf, self_addr, 32); + } +} + +TEST_F(QuicLinuxSocketUtilsTest, QuicMMsgHdr) { + QuicCircularDeque<BufferedWrite> buffered_writes; + char packet_buf1[1024]; + char packet_buf2[512]; + buffered_writes.emplace_back( + packet_buf1, sizeof(packet_buf1), QuicIpAddress::Loopback4(), + QuicSocketAddress(QuicIpAddress::Loopback4(), 4)); + buffered_writes.emplace_back( + packet_buf2, sizeof(packet_buf2), QuicIpAddress::Loopback6(), + QuicSocketAddress(QuicIpAddress::Loopback6(), 6)); + + QuicMMsgHdr quic_mhdr_without_cbuf(buffered_writes.begin(), + buffered_writes.end(), 0, nullptr); + for (size_t i = 0; i < buffered_writes.size(); ++i) { + const BufferedWrite& bw = buffered_writes[i]; + CheckMsghdrWithoutCbuf(&quic_mhdr_without_cbuf.mhdr()[i].msg_hdr, bw.buffer, + bw.buf_len, bw.peer_address); + } + + QuicMMsgHdr quic_mhdr_with_cbuf( + buffered_writes.begin(), buffered_writes.end(), + kCmsgSpaceForIp + kCmsgSpaceForSegmentSize, + [](QuicMMsgHdr* mhdr, int i, const BufferedWrite& buffered_write) { + mhdr->SetIpInNextCmsg(i, buffered_write.self_address); + *mhdr->GetNextCmsgData<uint16_t>(i, SOL_UDP, UDP_SEGMENT) = 1300; + }); + for (size_t i = 0; i < buffered_writes.size(); ++i) { + const BufferedWrite& bw = buffered_writes[i]; + msghdr* hdr = &quic_mhdr_with_cbuf.mhdr()[i].msg_hdr; + CheckIpAndGsoSizeInCbuf(hdr, hdr->msg_control, bw.self_address, 1300); + } +} + +TEST_F(QuicLinuxSocketUtilsTest, WriteMultiplePackets_NoPacketsToSend) { + int num_packets_sent; + QuicCircularDeque<BufferedWrite> buffered_writes; + + EXPECT_CALL(mock_syscalls_, Sendmmsg(_, _, _, _)).Times(0); + + EXPECT_EQ(WriteResult(WRITE_STATUS_ERROR, EINVAL), + TestWriteMultiplePackets(1, buffered_writes.begin(), + buffered_writes.end(), &num_packets_sent)); +} + +TEST_F(QuicLinuxSocketUtilsTest, WriteMultiplePackets_WriteBlocked) { + int num_packets_sent; + QuicCircularDeque<BufferedWrite> buffered_writes; + buffered_writes.emplace_back(nullptr, 0, QuicIpAddress(), + QuicSocketAddress(QuicIpAddress::Any4(), 0)); + + EXPECT_CALL(mock_syscalls_, Sendmmsg(_, _, _, _)) + .WillOnce(Invoke([](int /*fd*/, mmsghdr* /*msgvec*/, + unsigned int /*vlen*/, int /*flags*/) { + errno = EWOULDBLOCK; + return -1; + })); + + EXPECT_EQ(WriteResult(WRITE_STATUS_BLOCKED, EWOULDBLOCK), + TestWriteMultiplePackets(1, buffered_writes.begin(), + buffered_writes.end(), &num_packets_sent)); + EXPECT_EQ(0, num_packets_sent); +} + +TEST_F(QuicLinuxSocketUtilsTest, WriteMultiplePackets_WriteError) { + int num_packets_sent; + QuicCircularDeque<BufferedWrite> buffered_writes; + buffered_writes.emplace_back(nullptr, 0, QuicIpAddress(), + QuicSocketAddress(QuicIpAddress::Any4(), 0)); + + EXPECT_CALL(mock_syscalls_, Sendmmsg(_, _, _, _)) + .WillOnce(Invoke([](int /*fd*/, mmsghdr* /*msgvec*/, + unsigned int /*vlen*/, int /*flags*/) { + errno = EPERM; + return -1; + })); + + EXPECT_EQ(WriteResult(WRITE_STATUS_ERROR, EPERM), + TestWriteMultiplePackets(1, buffered_writes.begin(), + buffered_writes.end(), &num_packets_sent)); + EXPECT_EQ(0, num_packets_sent); +} + +TEST_F(QuicLinuxSocketUtilsTest, WriteMultiplePackets_WriteSuccess) { + int num_packets_sent; + QuicCircularDeque<BufferedWrite> buffered_writes; + const int kNumBufferedWrites = 10; + static_assert(kNumBufferedWrites < 256, "Must be less than 256"); + std::vector<std::string> buffer_holder; + for (int i = 0; i < kNumBufferedWrites; ++i) { + size_t buf_len = (i + 1) * 2; + std::ostringstream buffer_ostream; + while (buffer_ostream.str().length() < buf_len) { + buffer_ostream << i; + } + buffer_holder.push_back(buffer_ostream.str().substr(0, buf_len - 1) + '$'); + + buffered_writes.emplace_back(buffer_holder.back().data(), buf_len, + QuicIpAddress(), + QuicSocketAddress(QuicIpAddress::Any4(), 0)); + + // Leave the first self_address uninitialized. + if (i != 0) { + ASSERT_TRUE(buffered_writes.back().self_address.FromString("127.0.0.1")); + } + + std::ostringstream peer_ip_ostream; + QuicIpAddress peer_ip_address; + peer_ip_ostream << "127.0.1." << i + 1; + ASSERT_TRUE(peer_ip_address.FromString(peer_ip_ostream.str())); + buffered_writes.back().peer_address = + QuicSocketAddress(peer_ip_address, i + 1); + } + + InSequence s; + + for (int expected_num_packets_sent : {1, 2, 3, 10}) { + SCOPED_TRACE(testing::Message() + << "expected_num_packets_sent=" << expected_num_packets_sent); + EXPECT_CALL(mock_syscalls_, Sendmmsg(_, _, _, _)) + .WillOnce(Invoke( + [&](int /*fd*/, mmsghdr* msgvec, unsigned int vlen, int /*flags*/) { + EXPECT_LE(static_cast<unsigned int>(expected_num_packets_sent), + vlen); + for (unsigned int i = 0; i < vlen; ++i) { + const BufferedWrite& buffered_write = buffered_writes[i]; + const msghdr& hdr = msgvec[i].msg_hdr; + EXPECT_EQ(1u, hdr.msg_iovlen); + EXPECT_EQ(buffered_write.buffer, hdr.msg_iov->iov_base); + EXPECT_EQ(buffered_write.buf_len, hdr.msg_iov->iov_len); + sockaddr_storage expected_peer_address = + buffered_write.peer_address.generic_address(); + EXPECT_EQ(0, memcmp(&expected_peer_address, hdr.msg_name, + sizeof(sockaddr_storage))); + EXPECT_EQ(buffered_write.self_address.IsInitialized(), + hdr.msg_control != nullptr); + } + return expected_num_packets_sent; + })) + .RetiresOnSaturation(); + + int expected_bytes_written = 0; + for (auto it = buffered_writes.cbegin(); + it != buffered_writes.cbegin() + expected_num_packets_sent; ++it) { + expected_bytes_written += it->buf_len; + } + + EXPECT_EQ( + WriteResult(WRITE_STATUS_OK, expected_bytes_written), + TestWriteMultiplePackets(1, buffered_writes.cbegin(), + buffered_writes.cend(), &num_packets_sent)); + EXPECT_EQ(expected_num_packets_sent, num_packets_sent); + } +} + +} // namespace +} // namespace test +} // namespace quic
diff --git a/quic/core/quic_syscall_wrapper.cc b/quic/core/quic_syscall_wrapper.cc new file mode 100644 index 0000000..b2404c6 --- /dev/null +++ b/quic/core/quic_syscall_wrapper.cc
@@ -0,0 +1,49 @@ +// Copyright (c) 2019 The Chromium Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +#include "net/third_party/quiche/src/quic/core/quic_syscall_wrapper.h" + +#include <atomic> +#include <cerrno> + +namespace quic { +namespace { +std::atomic<QuicSyscallWrapper*> global_syscall_wrapper(new QuicSyscallWrapper); +} // namespace + +ssize_t QuicSyscallWrapper::Sendmsg(int sockfd, const msghdr* msg, int flags) { + return ::sendmsg(sockfd, msg, flags); +} + +int QuicSyscallWrapper::Sendmmsg(int sockfd, + mmsghdr* msgvec, + unsigned int vlen, + int flags) { +#if defined(__linux__) && !defined(__ANDROID__) + return ::sendmmsg(sockfd, msgvec, vlen, flags); +#else + errno = ENOSYS; + return -1; +#endif +} + +QuicSyscallWrapper* GetGlobalSyscallWrapper() { + return global_syscall_wrapper.load(); +} + +void SetGlobalSyscallWrapper(QuicSyscallWrapper* wrapper) { + global_syscall_wrapper.store(wrapper); +} + +ScopedGlobalSyscallWrapperOverride::ScopedGlobalSyscallWrapperOverride( + QuicSyscallWrapper* wrapper_in_scope) + : original_wrapper_(GetGlobalSyscallWrapper()) { + SetGlobalSyscallWrapper(wrapper_in_scope); +} + +ScopedGlobalSyscallWrapperOverride::~ScopedGlobalSyscallWrapperOverride() { + SetGlobalSyscallWrapper(original_wrapper_); +} + +} // namespace quic
diff --git a/quic/core/quic_syscall_wrapper.h b/quic/core/quic_syscall_wrapper.h new file mode 100644 index 0000000..3a80ac6 --- /dev/null +++ b/quic/core/quic_syscall_wrapper.h
@@ -0,0 +1,49 @@ +// Copyright (c) 2019 The Chromium Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +#ifndef QUICHE_QUIC_PLATFORM_IMPL_QUIC_SYSCALL_WRAPPER_H_ +#define QUICHE_QUIC_PLATFORM_IMPL_QUIC_SYSCALL_WRAPPER_H_ + +#include <sys/socket.h> +#include <sys/types.h> + +#include "net/third_party/quiche/src/quic/platform/api/quic_export.h" + +struct mmsghdr; +namespace quic { + +// QuicSyscallWrapper is a pass-through proxy to the real syscalls. +class QUIC_EXPORT_PRIVATE QuicSyscallWrapper { + public: + virtual ~QuicSyscallWrapper() = default; + + virtual ssize_t Sendmsg(int sockfd, const msghdr* msg, int flags); + + virtual int Sendmmsg(int sockfd, + mmsghdr* msgvec, + unsigned int vlen, + int flags); +}; + +// A global instance of QuicSyscallWrapper, used by some socket util functions. +QuicSyscallWrapper* GetGlobalSyscallWrapper(); + +// Change the global QuicSyscallWrapper to |wrapper|, for testing. +void SetGlobalSyscallWrapper(QuicSyscallWrapper* wrapper); + +// ScopedGlobalSyscallWrapperOverride changes the global QuicSyscallWrapper +// during its lifetime, for testing. +class QUIC_EXPORT_PRIVATE ScopedGlobalSyscallWrapperOverride { + public: + explicit ScopedGlobalSyscallWrapperOverride( + QuicSyscallWrapper* wrapper_in_scope); + ~ScopedGlobalSyscallWrapperOverride(); + + private: + QuicSyscallWrapper* original_wrapper_; +}; + +} // namespace quic + +#endif // QUICHE_QUIC_PLATFORM_IMPL_QUIC_SYSCALL_WRAPPER_H_
diff --git a/quic/test_tools/quic_mock_syscall_wrapper.cc b/quic/test_tools/quic_mock_syscall_wrapper.cc new file mode 100644 index 0000000..3cce97e --- /dev/null +++ b/quic/test_tools/quic_mock_syscall_wrapper.cc
@@ -0,0 +1,22 @@ +// Copyright (c) 2019 The Chromium Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +#include "net/third_party/quiche/src/quic/test_tools/quic_mock_syscall_wrapper.h" + +using testing::_; +using testing::Invoke; + +namespace quic { +namespace test { + +MockQuicSyscallWrapper::MockQuicSyscallWrapper(QuicSyscallWrapper* delegate) { + ON_CALL(*this, Sendmsg(_, _, _)) + .WillByDefault(Invoke(delegate, &QuicSyscallWrapper::Sendmsg)); + + ON_CALL(*this, Sendmmsg(_, _, _, _)) + .WillByDefault(Invoke(delegate, &QuicSyscallWrapper::Sendmmsg)); +} + +} // namespace test +} // namespace quic
diff --git a/quic/test_tools/quic_mock_syscall_wrapper.h b/quic/test_tools/quic_mock_syscall_wrapper.h new file mode 100644 index 0000000..8dac5ad --- /dev/null +++ b/quic/test_tools/quic_mock_syscall_wrapper.h
@@ -0,0 +1,37 @@ +// Copyright (c) 2019 The Chromium Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +#ifndef QUICHE_QUIC_PLATFORM_IMPL_QUIC_MOCK_SYSCALL_WRAPPER_H_ +#define QUICHE_QUIC_PLATFORM_IMPL_QUIC_MOCK_SYSCALL_WRAPPER_H_ + +#include "net/third_party/quiche/src/quic/core/quic_syscall_wrapper.h" +#include "net/third_party/quiche/src/quic/platform/api/quic_test.h" + +namespace quic { +namespace test { + +class MockQuicSyscallWrapper : public QuicSyscallWrapper { + public: + // Create a standard mock object. + MockQuicSyscallWrapper() = default; + + // Create a 'mockable' object that delegates everything to |delegate| by + // default. + explicit MockQuicSyscallWrapper(QuicSyscallWrapper* delegate); + + MOCK_METHOD(ssize_t, + Sendmsg, + (int sockfd, const msghdr*, int flags), + (override)); + + MOCK_METHOD(int, + Sendmmsg, + (int sockfd, mmsghdr*, unsigned int vlen, int flags), + (override)); +}; + +} // namespace test +} // namespace quic + +#endif // QUICHE_QUIC_PLATFORM_IMPL_QUIC_MOCK_SYSCALL_WRAPPER_H_