blob: 04f2cfd5611f5777e8d49802d27dc9e279194e3f [file] [log] [blame]
#include "http2/adapter/oghttp2_session.h"
#include <tuple>
#include "absl/strings/escaping.h"
#include "http2/adapter/oghttp2_util.h"
namespace http2 {
namespace adapter {
namespace {
const size_t kMaxMetadataFrameSize = 16384;
// TODO(birenroy): Consider incorporating spdy::FlagsSerializionVisitor here.
class FrameAttributeCollector : public spdy::SpdyFrameVisitor {
public:
FrameAttributeCollector() = default;
void VisitData(const spdy::SpdyDataIR& data) override {
frame_type_ = static_cast<uint8_t>(data.frame_type());
stream_id_ = data.stream_id();
length_ =
data.data_len() + (data.padded() ? 1 : 0) + data.padding_payload_len();
flags_ = (data.fin() ? 0x1 : 0) | (data.padded() ? 0x8 : 0);
}
void VisitHeaders(const spdy::SpdyHeadersIR& headers) override {
frame_type_ = static_cast<uint8_t>(headers.frame_type());
stream_id_ = headers.stream_id();
length_ = headers.size() - spdy::kFrameHeaderSize;
flags_ = 0x4 | (headers.fin() ? 0x1 : 0) | (headers.padded() ? 0x8 : 0) |
(headers.has_priority() ? 0x20 : 0);
}
void VisitPriority(const spdy::SpdyPriorityIR& priority) override {
frame_type_ = static_cast<uint8_t>(priority.frame_type());
frame_type_ = 2;
length_ = 5;
stream_id_ = priority.stream_id();
}
void VisitRstStream(const spdy::SpdyRstStreamIR& rst_stream) override {
frame_type_ = static_cast<uint8_t>(rst_stream.frame_type());
frame_type_ = 3;
length_ = 4;
stream_id_ = rst_stream.stream_id();
error_code_ = rst_stream.error_code();
}
void VisitSettings(const spdy::SpdySettingsIR& settings) override {
frame_type_ = static_cast<uint8_t>(settings.frame_type());
frame_type_ = 4;
length_ = 6 * settings.values().size();
flags_ = (settings.is_ack() ? 0x1 : 0);
}
void VisitPushPromise(const spdy::SpdyPushPromiseIR& push_promise) override {
frame_type_ = static_cast<uint8_t>(push_promise.frame_type());
frame_type_ = 5;
length_ = push_promise.size() - spdy::kFrameHeaderSize;
stream_id_ = push_promise.stream_id();
flags_ = (push_promise.padded() ? 0x8 : 0);
}
void VisitPing(const spdy::SpdyPingIR& ping) override {
frame_type_ = static_cast<uint8_t>(ping.frame_type());
frame_type_ = 6;
length_ = 8;
flags_ = (ping.is_ack() ? 0x1 : 0);
}
void VisitGoAway(const spdy::SpdyGoAwayIR& goaway) override {
frame_type_ = static_cast<uint8_t>(goaway.frame_type());
frame_type_ = 7;
length_ = goaway.size() - spdy::kFrameHeaderSize;
error_code_ = goaway.error_code();
}
void VisitWindowUpdate(
const spdy::SpdyWindowUpdateIR& window_update) override {
frame_type_ = static_cast<uint8_t>(window_update.frame_type());
frame_type_ = 8;
length_ = 4;
stream_id_ = window_update.stream_id();
}
void VisitContinuation(
const spdy::SpdyContinuationIR& continuation) override {
frame_type_ = static_cast<uint8_t>(continuation.frame_type());
stream_id_ = continuation.stream_id();
flags_ = continuation.end_headers() ? 0x4 : 0;
length_ = continuation.size() - spdy::kFrameHeaderSize;
}
void VisitUnknown(const spdy::SpdyUnknownIR& unknown) override {
frame_type_ = static_cast<uint8_t>(unknown.frame_type());
stream_id_ = unknown.stream_id();
flags_ = unknown.flags();
length_ = unknown.size() - spdy::kFrameHeaderSize;
}
void VisitAltSvc(const spdy::SpdyAltSvcIR& /*altsvc*/) override {}
void VisitPriorityUpdate(
const spdy::SpdyPriorityUpdateIR& /*priority_update*/) override {}
void VisitAcceptCh(const spdy::SpdyAcceptChIR& /*accept_ch*/) override {}
uint32_t stream_id() { return stream_id_; }
uint32_t length() { return length_; }
uint32_t error_code() { return error_code_; }
uint8_t frame_type() { return frame_type_; }
uint8_t flags() { return flags_; }
private:
uint32_t stream_id_ = 0;
uint32_t length_ = 0;
uint32_t error_code_ = 0;
uint8_t frame_type_ = 0;
uint8_t flags_ = 0;
};
} // namespace
void OgHttp2Session::PassthroughHeadersHandler::OnHeaderBlockStart() {
const bool status = visitor_.OnBeginHeadersForStream(stream_id_);
if (!status) {
result_ = Http2VisitorInterface::HEADER_CONNECTION_ERROR;
}
}
void OgHttp2Session::PassthroughHeadersHandler::OnHeader(
absl::string_view key,
absl::string_view value) {
if (result_ == Http2VisitorInterface::HEADER_OK) {
result_ = visitor_.OnHeaderForStream(stream_id_, key, value);
}
}
void OgHttp2Session::PassthroughHeadersHandler::OnHeaderBlockEnd(
size_t /* uncompressed_header_bytes */,
size_t /* compressed_header_bytes */) {
if (result_ == Http2VisitorInterface::HEADER_OK) {
visitor_.OnEndHeadersForStream(stream_id_);
} else {
session_.OnHeaderStatus(stream_id_, result_);
}
}
OgHttp2Session::OgHttp2Session(Http2VisitorInterface& visitor, Options options)
: visitor_(visitor),
headers_handler_(*this, visitor),
connection_window_manager_(kInitialFlowControlWindowSize,
[this](size_t window_update_delta) {
SendWindowUpdate(kConnectionStreamId,
window_update_delta);
}),
options_(options) {
decoder_.set_visitor(this);
decoder_.set_extension_visitor(this);
if (options_.perspective == Perspective::kServer) {
remaining_preface_ = {spdy::kHttp2ConnectionHeaderPrefix,
spdy::kHttp2ConnectionHeaderPrefixSize};
}
}
OgHttp2Session::~OgHttp2Session() {}
void OgHttp2Session::SetStreamUserData(Http2StreamId stream_id,
void* user_data) {
auto it = stream_map_.find(stream_id);
if (it != stream_map_.end()) {
it->second.user_data = user_data;
}
}
void* OgHttp2Session::GetStreamUserData(Http2StreamId stream_id) {
auto it = stream_map_.find(stream_id);
if (it != stream_map_.end()) {
return it->second.user_data;
}
return nullptr;
}
bool OgHttp2Session::ResumeStream(Http2StreamId stream_id) {
auto it = stream_map_.find(stream_id);
if (it->second.outbound_body == nullptr ||
!write_scheduler_.StreamRegistered(stream_id)) {
return false;
}
write_scheduler_.MarkStreamReady(stream_id, /*add_to_front=*/false);
return true;
}
int OgHttp2Session::GetStreamSendWindowSize(Http2StreamId stream_id) const {
auto it = stream_map_.find(stream_id);
if (it != stream_map_.end()) {
return it->second.send_window;
}
return -1;
}
int OgHttp2Session::GetStreamReceiveWindowLimit(Http2StreamId stream_id) const {
auto it = stream_map_.find(stream_id);
if (it != stream_map_.end()) {
return it->second.window_manager.WindowSizeLimit();
}
return -1;
}
int OgHttp2Session::GetStreamReceiveWindowSize(Http2StreamId stream_id) const {
auto it = stream_map_.find(stream_id);
if (it != stream_map_.end()) {
return it->second.window_manager.CurrentWindowSize();
}
return -1;
}
int OgHttp2Session::GetReceiveWindowSize() const {
return connection_window_manager_.CurrentWindowSize();
}
int OgHttp2Session::GetHpackEncoderDynamicTableSize() const {
const spdy::HpackEncoder* encoder = framer_.GetHpackEncoder();
return encoder == nullptr ? 0 : encoder->GetDynamicTableSize();
}
int OgHttp2Session::GetHpackDecoderDynamicTableSize() const {
const spdy::HpackDecoderAdapter* decoder = decoder_.GetHpackDecoder();
return decoder == nullptr ? 0 : decoder->GetDynamicTableSize();
}
ssize_t OgHttp2Session::ProcessBytes(absl::string_view bytes) {
ssize_t preface_consumed = 0;
if (!remaining_preface_.empty()) {
QUICHE_VLOG(2) << "Preface bytes remaining: " << remaining_preface_.size();
// decoder_ does not understand the client connection preface.
size_t min_size = std::min(remaining_preface_.size(), bytes.size());
if (!absl::StartsWith(remaining_preface_, bytes.substr(0, min_size))) {
// Preface doesn't match!
QUICHE_DLOG(INFO) << "Preface doesn't match! Expected: ["
<< absl::CEscape(remaining_preface_) << "], actual: ["
<< absl::CEscape(bytes) << "]";
visitor_.OnConnectionError();
return -1;
}
remaining_preface_.remove_prefix(min_size);
bytes.remove_prefix(min_size);
if (!remaining_preface_.empty()) {
QUICHE_VLOG(2) << "Preface bytes remaining: "
<< remaining_preface_.size();
return min_size;
}
preface_consumed = min_size;
}
ssize_t result = decoder_.ProcessInput(bytes.data(), bytes.size());
return result < 0 ? result : result + preface_consumed;
}
int OgHttp2Session::Consume(Http2StreamId stream_id, size_t num_bytes) {
auto it = stream_map_.find(stream_id);
if (it == stream_map_.end()) {
// TODO(b/181586191): LOG_ERROR rather than QUICHE_BUG.
QUICHE_BUG(stream_consume_notfound)
<< "Stream " << stream_id << " not found";
} else {
it->second.window_manager.MarkDataFlushed(num_bytes);
}
connection_window_manager_.MarkDataFlushed(num_bytes);
return 0; // Remove?
}
void OgHttp2Session::StartGracefulShutdown() {
if (options_.perspective == Perspective::kServer) {
if (!queued_goaway_) {
EnqueueFrame(absl::make_unique<spdy::SpdyGoAwayIR>(
std::numeric_limits<int32_t>::max(), spdy::ERROR_CODE_NO_ERROR,
"graceful_shutdown"));
}
} else {
QUICHE_LOG(ERROR) << "Graceful shutdown not needed for clients.";
}
}
void OgHttp2Session::EnqueueFrame(std::unique_ptr<spdy::SpdyFrameIR> frame) {
if (frame->frame_type() == spdy::SpdyFrameType::GOAWAY) {
queued_goaway_ = true;
} else if (frame->frame_type() == spdy::SpdyFrameType::RST_STREAM) {
streams_reset_.insert(frame->stream_id());
auto iter = stream_map_.find(frame->stream_id());
if (iter != stream_map_.end()) {
iter->second.half_closed_local = true;
}
}
frames_.push_back(std::move(frame));
}
int OgHttp2Session::Send() {
MaybeSetupPreface();
ssize_t result = std::numeric_limits<ssize_t>::max();
// Flush any serialized prefix.
while (result > 0 && !serialized_prefix_.empty()) {
result = visitor_.OnReadyToSend(serialized_prefix_);
if (result > 0) {
serialized_prefix_.erase(0, result);
}
}
if (!serialized_prefix_.empty()) {
return result < 0 ? result : 0;
}
bool continue_writing = SendQueuedFrames();
while (continue_writing && !connection_metadata_.empty()) {
continue_writing = SendMetadata(0, connection_metadata_);
}
// Wake streams for writes.
while (continue_writing && write_scheduler_.HasReadyStreams() &&
connection_send_window_ > 0) {
const Http2StreamId stream_id = write_scheduler_.PopNextReadyStream();
// TODO(birenroy): Add a return value to indicate write blockage, so streams
// aren't woken unnecessarily.
continue_writing = WriteForStream(stream_id);
}
if (continue_writing) {
SendQueuedFrames();
}
return 0;
}
bool OgHttp2Session::SendQueuedFrames() {
// Serialize and send frames in the queue.
while (!frames_.empty()) {
const auto& frame_ptr = frames_.front();
FrameAttributeCollector c;
frame_ptr->Visit(&c);
visitor_.OnBeforeFrameSent(c.frame_type(), c.stream_id(), c.length(),
c.flags());
spdy::SpdySerializedFrame frame = framer_.SerializeFrame(*frame_ptr);
const ssize_t result = visitor_.OnReadyToSend(absl::string_view(frame));
if (result < 0) {
visitor_.OnConnectionError();
return false;
} else if (result == 0) {
// Write blocked.
return false;
} else {
visitor_.OnFrameSent(c.frame_type(), c.stream_id(), c.length(), c.flags(),
c.error_code());
if (static_cast<FrameType>(c.frame_type()) == FrameType::RST_STREAM) {
// If this endpoint is resetting the stream, the stream should be
// closed. This endpoint is already aware of the outbound RST_STREAM and
// its error code, so close with NO_ERROR.
visitor_.OnCloseStream(c.stream_id(), Http2ErrorCode::NO_ERROR);
}
frames_.pop_front();
if (static_cast<size_t>(result) < frame.size()) {
// The frame was partially written, so the rest must be buffered.
serialized_prefix_.assign(frame.data() + result, frame.size() - result);
return false;
}
}
}
return true;
}
bool OgHttp2Session::WriteForStream(Http2StreamId stream_id) {
auto it = stream_map_.find(stream_id);
if (it == stream_map_.end()) {
QUICHE_LOG(ERROR) << "Can't find stream " << stream_id
<< " which is ready to write!";
return true;
}
StreamState& state = it->second;
bool connection_can_write = true;
if (!state.outbound_metadata.empty()) {
connection_can_write = SendMetadata(stream_id, state.outbound_metadata);
}
if (state.outbound_body == nullptr) {
// No data to send, but there might be trailers.
if (state.trailers != nullptr) {
auto block_ptr = std::move(state.trailers);
if (state.half_closed_local) {
QUICHE_LOG(ERROR) << "Sent fin; can't send trailers.";
} else {
SendTrailers(stream_id, std::move(*block_ptr));
MaybeCloseWithRstStream(stream_id, state);
}
}
return true;
}
bool source_can_produce = true;
int32_t available_window = std::min(
std::min(connection_send_window_, state.send_window), max_frame_payload_);
while (connection_can_write && available_window > 0 &&
state.outbound_body != nullptr) {
ssize_t length;
bool end_data;
std::tie(length, end_data) =
state.outbound_body->SelectPayloadLength(available_window);
if (length == 0 && !end_data) {
source_can_produce = false;
break;
} else if (length == DataFrameSource::kError) {
source_can_produce = false;
visitor_.OnCloseStream(stream_id, Http2ErrorCode::INTERNAL_ERROR);
break;
}
const bool fin = end_data ? state.outbound_body->send_fin() : false;
if (length > 0 || fin) {
spdy::SpdyDataIR data(stream_id);
data.set_fin(fin);
data.SetDataShallow(length);
spdy::SpdySerializedFrame header =
spdy::SpdyFramer::SerializeDataFrameHeaderWithPaddingLengthField(
data);
QUICHE_DCHECK(serialized_prefix_.empty() && frames_.empty());
const bool success =
state.outbound_body->Send(absl::string_view(header), length);
if (!success) {
connection_can_write = false;
break;
}
visitor_.OnFrameSent(/* DATA */ 0, stream_id, length, fin ? 0x1 : 0x0, 0);
connection_send_window_ -= length;
state.send_window -= length;
available_window =
std::min(std::min(connection_send_window_, state.send_window),
max_frame_payload_);
}
if (end_data) {
bool sent_trailers = false;
if (state.trailers != nullptr) {
auto block_ptr = std::move(state.trailers);
if (fin) {
QUICHE_LOG(ERROR) << "Sent fin; can't send trailers.";
} else {
SendTrailers(stream_id, std::move(*block_ptr));
sent_trailers = true;
}
}
state.outbound_body = nullptr;
if (fin || sent_trailers) {
MaybeCloseWithRstStream(stream_id, state);
}
}
}
// If the stream still has data to send, it should be marked as ready in the
// write scheduler.
if (source_can_produce && state.send_window > 0 &&
state.outbound_body != nullptr) {
write_scheduler_.MarkStreamReady(stream_id, false);
}
// Streams can continue writing as long as the connection is not write-blocked
// and there is additional flow control quota available.
return connection_can_write && available_window > 0;
}
bool OgHttp2Session::SendMetadata(Http2StreamId stream_id,
OgHttp2Session::MetadataSequence& sequence) {
auto payload_buffer = absl::make_unique<uint8_t[]>(kMaxMetadataFrameSize);
while (!sequence.empty()) {
MetadataSource& source = *sequence.front();
ssize_t written;
bool end_metadata;
std::tie(written, end_metadata) =
source.Pack(payload_buffer.get(), kMaxMetadataFrameSize);
if (written < 0) {
// Did not touch the connection, so perhaps writes are still possible.
return true;
}
QUICHE_DCHECK_LE(static_cast<size_t>(written), kMaxMetadataFrameSize);
auto payload = absl::string_view(
reinterpret_cast<const char*>(payload_buffer.get()), written);
EnqueueFrame(absl::make_unique<spdy::SpdyUnknownIR>(
stream_id, kMetadataFrameType, end_metadata ? kMetadataEndFlag : 0u,
std::string(payload)));
if (end_metadata) {
sequence.erase(sequence.begin());
}
}
return SendQueuedFrames();
}
int32_t OgHttp2Session::SubmitRequest(
absl::Span<const Header> headers,
std::unique_ptr<DataFrameSource> data_source, void* user_data) {
// TODO(birenroy): return an error for the incorrect perspective
const Http2StreamId stream_id = next_stream_id_;
next_stream_id_ += 2;
// Convert headers to header block, create headers frame.
auto frame =
absl::make_unique<spdy::SpdyHeadersIR>(stream_id, ToHeaderBlock(headers));
// Add data source and user data to stream state
auto iter = CreateStream(stream_id);
write_scheduler_.MarkStreamReady(stream_id, false);
if (data_source == nullptr) {
frame->set_fin(true);
iter->second.half_closed_local = true;
} else {
iter->second.outbound_body = std::move(data_source);
}
iter->second.user_data = user_data;
// Enqueue headers frame
EnqueueFrame(std::move(frame));
return stream_id;
}
int OgHttp2Session::SubmitResponse(
Http2StreamId stream_id, absl::Span<const Header> headers,
std::unique_ptr<DataFrameSource> data_source) {
// TODO(birenroy): return an error for the incorrect perspective
auto iter = stream_map_.find(stream_id);
if (iter == stream_map_.end()) {
QUICHE_LOG(ERROR) << "Unable to find stream " << stream_id;
return -501; // NGHTTP2_ERR_INVALID_ARGUMENT
}
// Convert headers to header block, create headers frame
auto frame =
absl::make_unique<spdy::SpdyHeadersIR>(stream_id, ToHeaderBlock(headers));
if (data_source == nullptr) {
frame->set_fin(true);
if (iter->second.half_closed_remote) {
visitor_.OnCloseStream(stream_id, Http2ErrorCode::NO_ERROR);
}
// TODO(birenroy): the server adapter should probably delete stream state
// when calling visitor_.OnCloseStream.
} else {
// Add data source to stream state
iter->second.outbound_body = std::move(data_source);
write_scheduler_.MarkStreamReady(stream_id, false);
}
EnqueueFrame(std::move(frame));
return 0;
}
int OgHttp2Session::SubmitTrailer(Http2StreamId stream_id,
absl::Span<const Header> trailers) {
// TODO(birenroy): Reject trailers when acting as a client?
auto iter = stream_map_.find(stream_id);
if (iter == stream_map_.end()) {
QUICHE_LOG(ERROR) << "Unable to find stream " << stream_id;
return -501; // NGHTTP2_ERR_INVALID_ARGUMENT
}
StreamState& state = iter->second;
if (state.half_closed_local) {
QUICHE_LOG(ERROR) << "Stream " << stream_id << " is half closed (local)";
return -514; // NGHTTP2_ERR_INVALID_STREAM_STATE
}
if (state.trailers != nullptr) {
QUICHE_LOG(ERROR) << "Stream " << stream_id
<< " already has trailers queued";
return -514; // NGHTTP2_ERR_INVALID_STREAM_STATE
}
if (state.outbound_body == nullptr) {
// Enqueue trailers immediately.
SendTrailers(stream_id, ToHeaderBlock(trailers));
MaybeCloseWithRstStream(stream_id, state);
} else {
QUICHE_LOG_IF(ERROR, state.outbound_body->send_fin())
<< "DataFrameSource will send fin, preventing trailers!";
// Save trailers so they can be written once data is done.
state.trailers =
absl::make_unique<spdy::SpdyHeaderBlock>(ToHeaderBlock(trailers));
write_scheduler_.MarkStreamReady(stream_id, false);
}
return 0;
}
void OgHttp2Session::SubmitMetadata(Http2StreamId stream_id,
std::unique_ptr<MetadataSource> source) {
if (stream_id == 0) {
connection_metadata_.push_back(std::move(source));
} else {
auto iter = CreateStream(stream_id);
iter->second.outbound_metadata.push_back(std::move(source));
write_scheduler_.MarkStreamReady(stream_id, false);
}
}
void OgHttp2Session::OnError(http2::Http2DecoderAdapter::SpdyFramerError error,
std::string detailed_error) {
QUICHE_VLOG(1) << "Error: "
<< http2::Http2DecoderAdapter::SpdyFramerErrorToString(error)
<< " details: " << detailed_error;
visitor_.OnConnectionError();
}
void OgHttp2Session::OnCommonHeader(spdy::SpdyStreamId stream_id,
size_t length,
uint8_t type,
uint8_t flags) {
highest_received_stream_id_ = std::max(static_cast<Http2StreamId>(stream_id),
highest_received_stream_id_);
visitor_.OnFrameHeader(stream_id, length, type, flags);
}
void OgHttp2Session::OnDataFrameHeader(spdy::SpdyStreamId stream_id,
size_t length, bool /*fin*/) {
visitor_.OnBeginDataForStream(stream_id, length);
}
void OgHttp2Session::OnStreamFrameData(spdy::SpdyStreamId stream_id,
const char* data,
size_t len) {
MarkDataBuffered(stream_id, len);
visitor_.OnDataForStream(stream_id, absl::string_view(data, len));
}
void OgHttp2Session::OnStreamEnd(spdy::SpdyStreamId stream_id) {
auto iter = stream_map_.find(stream_id);
if (iter != stream_map_.end()) {
iter->second.half_closed_remote = true;
}
visitor_.OnEndStream(stream_id);
if (iter != stream_map_.end() && iter->second.half_closed_local &&
options_.perspective == Perspective::kClient) {
// From the client's perspective, the stream can be closed if it's already
// half_closed_local.
visitor_.OnCloseStream(stream_id, Http2ErrorCode::NO_ERROR);
}
}
void OgHttp2Session::OnStreamPadLength(spdy::SpdyStreamId stream_id,
size_t value) {
MarkDataBuffered(stream_id, 1 + value);
// TODO(181586191): Pass padding to the visitor?
}
void OgHttp2Session::OnStreamPadding(spdy::SpdyStreamId /*stream_id*/, size_t
/*len*/) {
// Flow control was accounted for in OnStreamPadLength().
// TODO(181586191): Pass padding to the visitor?
}
spdy::SpdyHeadersHandlerInterface* OgHttp2Session::OnHeaderFrameStart(
spdy::SpdyStreamId stream_id) {
headers_handler_.set_stream_id(stream_id);
return &headers_handler_;
}
void OgHttp2Session::OnHeaderFrameEnd(spdy::SpdyStreamId /*stream_id*/) {
headers_handler_.set_stream_id(0);
}
void OgHttp2Session::OnRstStream(spdy::SpdyStreamId stream_id,
spdy::SpdyErrorCode error_code) {
auto iter = stream_map_.find(stream_id);
if (iter != stream_map_.end()) {
iter->second.half_closed_remote = true;
iter->second.outbound_body = nullptr;
write_scheduler_.UnregisterStream(stream_id);
}
visitor_.OnRstStream(stream_id, TranslateErrorCode(error_code));
// TODO(birenroy): Consider bundling "close stream" behavior into a dedicated
// method that also cleans up the stream map.
visitor_.OnCloseStream(stream_id, TranslateErrorCode(error_code));
}
void OgHttp2Session::OnSettings() {
visitor_.OnSettingsStart();
}
void OgHttp2Session::OnSetting(spdy::SpdySettingsId id, uint32_t value) {
visitor_.OnSetting({id, value});
if (id == kMetadataExtensionId) {
peer_supports_metadata_ = (value != 0);
}
}
void OgHttp2Session::OnSettingsEnd() {
visitor_.OnSettingsEnd();
auto settings = absl::make_unique<spdy::SpdySettingsIR>();
settings->set_is_ack(true);
EnqueueFrame(std::move(settings));
}
void OgHttp2Session::OnSettingsAck() {
visitor_.OnSettingsAck();
}
void OgHttp2Session::OnPing(spdy::SpdyPingId unique_id, bool is_ack) {
visitor_.OnPing(unique_id, is_ack);
}
void OgHttp2Session::OnGoAway(spdy::SpdyStreamId last_accepted_stream_id,
spdy::SpdyErrorCode error_code) {
received_goaway_ = true;
visitor_.OnGoAway(last_accepted_stream_id, TranslateErrorCode(error_code),
"");
}
bool OgHttp2Session::OnGoAwayFrameData(const char* /*goaway_data*/, size_t
/*len*/) {
// Opaque data is currently ignored.
return true;
}
void OgHttp2Session::OnHeaders(spdy::SpdyStreamId stream_id,
bool /*has_priority*/, int /*weight*/,
spdy::SpdyStreamId /*parent_stream_id*/,
bool /*exclusive*/, bool /*fin*/, bool /*end*/) {
if (options_.perspective == Perspective::kServer) {
CreateStream(stream_id);
}
}
void OgHttp2Session::OnWindowUpdate(spdy::SpdyStreamId stream_id,
int delta_window_size) {
if (stream_id == 0) {
connection_send_window_ += delta_window_size;
} else {
auto it = stream_map_.find(stream_id);
if (it == stream_map_.end()) {
QUICHE_VLOG(1) << "Stream " << stream_id << " not found!";
} else {
if (it->second.send_window == 0) {
// The stream was blocked on flow control.
write_scheduler_.MarkStreamReady(stream_id, false);
}
it->second.send_window += delta_window_size;
}
}
visitor_.OnWindowUpdate(stream_id, delta_window_size);
}
void OgHttp2Session::OnPushPromise(spdy::SpdyStreamId /*stream_id*/,
spdy::SpdyStreamId /*promised_stream_id*/,
bool /*end*/) {}
void OgHttp2Session::OnContinuation(spdy::SpdyStreamId /*stream_id*/, bool
/*end*/) {}
void OgHttp2Session::OnAltSvc(spdy::SpdyStreamId /*stream_id*/,
absl::string_view /*origin*/,
const spdy::SpdyAltSvcWireFormat::
AlternativeServiceVector& /*altsvc_vector*/) {
}
void OgHttp2Session::OnPriority(spdy::SpdyStreamId /*stream_id*/,
spdy::SpdyStreamId /*parent_stream_id*/,
int /*weight*/, bool /*exclusive*/) {}
void OgHttp2Session::OnPriorityUpdate(
spdy::SpdyStreamId /*prioritized_stream_id*/,
absl::string_view /*priority_field_value*/) {}
bool OgHttp2Session::OnUnknownFrame(spdy::SpdyStreamId /*stream_id*/,
uint8_t /*frame_type*/) {
return true;
}
void OgHttp2Session::OnHeaderStatus(
Http2StreamId stream_id, Http2VisitorInterface::OnHeaderResult result) {
QUICHE_DCHECK_NE(result, Http2VisitorInterface::HEADER_OK);
if (result == Http2VisitorInterface::HEADER_RST_STREAM) {
auto it = streams_reset_.find(stream_id);
if (it == streams_reset_.end()) {
EnqueueFrame(absl::make_unique<spdy::SpdyRstStreamIR>(
stream_id, spdy::ERROR_CODE_INTERNAL_ERROR));
}
} else if (result == Http2VisitorInterface::HEADER_CONNECTION_ERROR) {
visitor_.OnConnectionError();
}
}
bool OgHttp2Session::OnFrameHeader(spdy::SpdyStreamId stream_id, size_t length,
uint8_t type, uint8_t flags) {
if (type == kMetadataFrameType) {
QUICHE_DCHECK_EQ(metadata_length_, 0u);
visitor_.OnBeginMetadataForStream(stream_id, length);
metadata_stream_id_ = stream_id;
metadata_length_ = length;
end_metadata_ = flags & kMetadataEndFlag;
return true;
} else {
QUICHE_DLOG(INFO) << "Unexpected frame type " << static_cast<int>(type)
<< " received by the extension visitor.";
return false;
}
}
void OgHttp2Session::OnFramePayload(const char* data, size_t len) {
if (metadata_length_ > 0) {
QUICHE_DCHECK_LE(len, metadata_length_);
const bool success = visitor_.OnMetadataForStream(
metadata_stream_id_, absl::string_view(data, len));
if (success) {
metadata_length_ -= len;
if (metadata_length_ == 0 && end_metadata_) {
visitor_.OnMetadataEndForStream(metadata_stream_id_);
metadata_stream_id_ = 0;
end_metadata_ = false;
}
} else {
decoder_.StopProcessing();
}
} else {
QUICHE_DLOG(INFO) << "Unexpected metadata payload for stream "
<< metadata_stream_id_;
}
}
void OgHttp2Session::MaybeSetupPreface() {
if (!queued_preface_) {
if (options_.perspective == Perspective::kClient) {
serialized_prefix_.assign(spdy::kHttp2ConnectionHeaderPrefix,
spdy::kHttp2ConnectionHeaderPrefixSize);
}
// First frame must be a non-ack SETTINGS.
if (frames_.empty() ||
frames_.front()->frame_type() != spdy::SpdyFrameType::SETTINGS ||
reinterpret_cast<spdy::SpdySettingsIR*>(frames_.front().get())
->is_ack()) {
frames_.push_front(absl::make_unique<spdy::SpdySettingsIR>());
}
queued_preface_ = true;
}
}
void OgHttp2Session::SendWindowUpdate(Http2StreamId stream_id,
size_t update_delta) {
EnqueueFrame(
absl::make_unique<spdy::SpdyWindowUpdateIR>(stream_id, update_delta));
}
void OgHttp2Session::SendTrailers(Http2StreamId stream_id,
spdy::SpdyHeaderBlock trailers) {
auto frame =
absl::make_unique<spdy::SpdyHeadersIR>(stream_id, std::move(trailers));
frame->set_fin(true);
EnqueueFrame(std::move(frame));
}
void OgHttp2Session::MaybeCloseWithRstStream(Http2StreamId stream_id,
StreamState& state) {
state.half_closed_local = true;
if (options_.perspective == Perspective::kServer) {
if (state.half_closed_remote) {
visitor_.OnCloseStream(stream_id, Http2ErrorCode::NO_ERROR);
} else {
// Since the peer has not yet ended the stream, this endpoint should
// send a RST_STREAM NO_ERROR. See RFC 7540 Section 8.1.
EnqueueFrame(absl::make_unique<spdy::SpdyRstStreamIR>(
stream_id, spdy::SpdyErrorCode::ERROR_CODE_NO_ERROR));
// Enqueuing the RST_STREAM also invokes OnCloseStream.
}
// TODO(birenroy): the server adapter should probably delete stream state
// when calling visitor_.OnCloseStream.
}
}
void OgHttp2Session::MarkDataBuffered(Http2StreamId stream_id, size_t bytes) {
connection_window_manager_.MarkDataBuffered(bytes);
auto it = stream_map_.find(stream_id);
if (it != stream_map_.end()) {
it->second.window_manager.MarkDataBuffered(bytes);
}
}
OgHttp2Session::StreamStateMap::iterator OgHttp2Session::CreateStream(
Http2StreamId stream_id) {
WindowManager::WindowUpdateListener listener =
[this, stream_id](size_t window_update_delta) {
SendWindowUpdate(stream_id, window_update_delta);
};
absl::flat_hash_map<Http2StreamId, StreamState>::iterator iter;
bool inserted;
std::tie(iter, inserted) = stream_map_.try_emplace(
stream_id,
StreamState(stream_receive_window_limit_, std::move(listener)));
if (inserted) {
// Add the stream to the write scheduler.
const WriteScheduler::StreamPrecedenceType precedence(3);
write_scheduler_.RegisterStream(stream_id, precedence);
}
return iter;
}
} // namespace adapter
} // namespace http2