OgHttp2Session refactoring. * Delegates SubmitRequest/SubmitResponse to internal implementations. * Factors out some named methods for stream state operations related to message body handling. These named methods may become more complex in the future, so this is preparatory refactoring. PiperOrigin-RevId: 624309802
diff --git a/quiche/http2/adapter/oghttp2_session.cc b/quiche/http2/adapter/oghttp2_session.cc index 235b0fa..e5d8375 100644 --- a/quiche/http2/adapter/oghttp2_session.cc +++ b/quiche/http2/adapter/oghttp2_session.cc
@@ -407,7 +407,7 @@ bool OgHttp2Session::ResumeStream(Http2StreamId stream_id) { auto it = stream_map_.find(stream_id); - if (it == stream_map_.end() || it->second.outbound_body == nullptr || + if (it == stream_map_.end() || !HasMoreData(it->second) || !write_scheduler_.StreamRegistered(stream_id)) { return false; } @@ -652,6 +652,45 @@ return write_scheduler_.PopNextReadyStream(); } +int32_t OgHttp2Session::SubmitRequestInternal( + absl::Span<const Header> headers, + std::unique_ptr<DataFrameSource> data_source, void* user_data) { + // TODO(birenroy): return an error for the incorrect perspective + const Http2StreamId stream_id = next_stream_id_; + next_stream_id_ += 2; + if (!pending_streams_.empty() || !CanCreateStream()) { + // TODO(diannahu): There should probably be a limit to the number of allowed + // pending streams. + pending_streams_.insert( + {stream_id, PendingStreamState{ToHeaderBlock(headers), + std::move(data_source), user_data}}); + StartPendingStreams(); + } else { + StartRequest(stream_id, ToHeaderBlock(headers), std::move(data_source), + user_data); + } + return stream_id; +} + +int OgHttp2Session::SubmitResponseInternal( + Http2StreamId stream_id, absl::Span<const Header> headers, + std::unique_ptr<DataFrameSource> data_source) { + // TODO(birenroy): return an error for the incorrect perspective + auto iter = stream_map_.find(stream_id); + if (iter == stream_map_.end()) { + QUICHE_LOG(ERROR) << "Unable to find stream " << stream_id; + return -501; // NGHTTP2_ERR_INVALID_ARGUMENT + } + const bool end_stream = data_source == nullptr; + if (!end_stream) { + // Add data source to stream state + iter->second.outbound_body = std::move(data_source); + write_scheduler_.MarkStreamReady(stream_id, false); + } + SendHeaders(stream_id, ToHeaderBlock(headers), end_stream); + return 0; +} + OgHttp2Session::SendResult OgHttp2Session::MaybeSendBufferedData() { int64_t result = std::numeric_limits<int64_t>::max(); while (result > 0 && !buffered_data_.empty()) { @@ -797,19 +836,19 @@ if (reset_it != streams_reset_.end()) { // The stream has been reset; there's no point in sending DATA or trailing // HEADERS. - state.outbound_body = nullptr; + AbandonData(state); state.trailers = nullptr; return SendResult::SEND_OK; } SendResult connection_can_write = SendResult::SEND_OK; - if (state.outbound_body == nullptr || state.data_deferred) { + if (!IsReadyToWriteData(state)) { // No data to send, but there might be trailers. if (state.trailers != nullptr) { // Trailers will include END_STREAM, so the data source can be discarded. // Since data_deferred is true, there is no data waiting to be flushed for // this stream. - state.outbound_body = nullptr; + AbandonData(state); auto block_ptr = std::move(state.trailers); if (state.half_closed_local) { QUICHE_LOG(ERROR) << "Sent fin; can't send trailers."; @@ -827,7 +866,7 @@ std::min({connection_send_window_, state.send_window, static_cast<int32_t>(max_frame_payload_)}); while (connection_can_write == SendResult::SEND_OK && available_window > 0 && - state.outbound_body != nullptr && !state.data_deferred) { + IsReadyToWriteData(state)) { DataFrameInfo info = GetDataFrameInfo(stream_id, available_window, state); QUICHE_VLOG(2) << "WriteForStream | length: " << info.payload_length << " end_data: " << info.end_data @@ -900,13 +939,13 @@ SendTrailers(stream_id, std::move(*block_ptr)); } } - state.outbound_body = nullptr; + AbandonData(state); } } // If the stream still exists and has data to send, it should be marked as // ready in the write scheduler. if (stream_map_.contains(stream_id) && !state.data_deferred && - state.send_window > 0 && state.outbound_body != nullptr) { + state.send_window > 0 && HasMoreData(state)) { write_scheduler_.MarkStreamReady(stream_id, false); } // Streams can continue writing as long as the connection is not write-blocked @@ -946,40 +985,13 @@ int32_t OgHttp2Session::SubmitRequest( absl::Span<const Header> headers, std::unique_ptr<DataFrameSource> data_source, void* user_data) { - // TODO(birenroy): return an error for the incorrect perspective - const Http2StreamId stream_id = next_stream_id_; - next_stream_id_ += 2; - if (!pending_streams_.empty() || !CanCreateStream()) { - // TODO(diannahu): There should probably be a limit to the number of allowed - // pending streams. - pending_streams_.insert( - {stream_id, PendingStreamState{ToHeaderBlock(headers), - std::move(data_source), user_data}}); - StartPendingStreams(); - } else { - StartRequest(stream_id, ToHeaderBlock(headers), std::move(data_source), - user_data); - } - return stream_id; + return SubmitRequestInternal(headers, std::move(data_source), user_data); } int OgHttp2Session::SubmitResponse( Http2StreamId stream_id, absl::Span<const Header> headers, std::unique_ptr<DataFrameSource> data_source) { - // TODO(birenroy): return an error for the incorrect perspective - auto iter = stream_map_.find(stream_id); - if (iter == stream_map_.end()) { - QUICHE_LOG(ERROR) << "Unable to find stream " << stream_id; - return -501; // NGHTTP2_ERR_INVALID_ARGUMENT - } - const bool end_stream = data_source == nullptr; - if (!end_stream) { - // Add data source to stream state - iter->second.outbound_body = std::move(data_source); - write_scheduler_.MarkStreamReady(stream_id, false); - } - SendHeaders(stream_id, ToHeaderBlock(headers), end_stream); - return 0; + return SubmitResponseInternal(stream_id, headers, std::move(data_source)); } int OgHttp2Session::SubmitTrailer(Http2StreamId stream_id, @@ -1000,7 +1012,7 @@ << " already has trailers queued"; return -514; // NGHTTP2_ERR_INVALID_STREAM_STATE } - if (state.outbound_body == nullptr) { + if (!HasMoreData(state)) { // Enqueue trailers immediately. SendTrailers(stream_id, ToHeaderBlock(trailers)); } else { @@ -1215,7 +1227,7 @@ auto iter = stream_map_.find(stream_id); if (iter != stream_map_.end()) { iter->second.half_closed_remote = true; - iter->second.outbound_body = nullptr; + AbandonData(iter->second); } else if (static_cast<Http2StreamId>(stream_id) > highest_processed_stream_id_) { // Receiving RST_STREAM before HEADERS is a connection error. @@ -2039,6 +2051,18 @@ } } +bool OgHttp2Session::HasMoreData(const StreamState& stream_state) const { + return stream_state.outbound_body != nullptr; +} + +bool OgHttp2Session::IsReadyToWriteData(const StreamState& stream_state) const { + return stream_state.outbound_body != nullptr && !stream_state.data_deferred; +} + +void OgHttp2Session::AbandonData(StreamState& stream_state) { + stream_state.outbound_body = nullptr; +} + OgHttp2Session::DataFrameInfo OgHttp2Session::GetDataFrameInfo( Http2StreamId /*stream_id*/, size_t flow_control_available, StreamState& stream_state) {
diff --git a/quiche/http2/adapter/oghttp2_session.h b/quiche/http2/adapter/oghttp2_session.h index 580bef7..006383f 100644 --- a/quiche/http2/adapter/oghttp2_session.h +++ b/quiche/http2/adapter/oghttp2_session.h
@@ -365,6 +365,13 @@ // streams, returns zero. Http2StreamId GetNextReadyStream(); + int32_t SubmitRequestInternal(absl::Span<const Header> headers, + std::unique_ptr<DataFrameSource> data_source, + void* user_data); + int SubmitResponseInternal(Http2StreamId stream_id, + absl::Span<const Header> headers, + std::unique_ptr<DataFrameSource> data_source); + // Sends the buffered connection preface or serialized frame data, if any. SendResult MaybeSendBufferedData(); @@ -447,6 +454,17 @@ // initial window. void UpdateStreamReceiveWindowSizes(uint32_t new_value); + // Returns true if the given stream has additional data to write before + // trailers or the end of the stream. + bool HasMoreData(const StreamState& stream_state) const; + + // Returns true if the given stream has data ready to write. Trailers are + // considered separately. + bool IsReadyToWriteData(const StreamState& stream_state) const; + + // Abandons any remaining data, e.g. on stream reset. + void AbandonData(StreamState& stream_state); + // Gathers information required to construct a DATA frame header. struct DataFrameInfo { int64_t payload_length;