Write push promise on request streams.

This CL migrates push promise from headers stream to request streams, and HTTP/3 push promise format is used in v99.

gfe-relnote: v99 only, not used in prod.
PiperOrigin-RevId: 260185991
Change-Id: I676cb4ebb71f72b321934c91d77ce4911f1d7225
diff --git a/quic/core/http/quic_spdy_stream.cc b/quic/core/http/quic_spdy_stream.cc
index ca1345d..c9fa852 100644
--- a/quic/core/http/quic_spdy_stream.cc
+++ b/quic/core/http/quic_spdy_stream.cc
@@ -124,23 +124,32 @@
     return stream_->OnHeadersFrameEnd();
   }
 
-  bool OnPushPromiseFrameStart(PushId /*push_id*/,
-                               Http3FrameLengths /*frame_length*/) override {
-    // TODO(b/137554973): Consume frame header.
-    CloseConnectionOnWrongFrame("Push Promise");
-    return false;
+  bool OnPushPromiseFrameStart(PushId push_id,
+                               Http3FrameLengths frame_length,
+                               QuicByteCount push_id_length) override {
+    if (!VersionHasStreamType(stream_->transport_version())) {
+      CloseConnectionOnWrongFrame("Push Promise");
+      return false;
+    }
+    return stream_->OnPushPromiseFrameStart(push_id, frame_length,
+                                            push_id_length);
   }
 
   bool OnPushPromiseFramePayload(QuicStringPiece payload) override {
-    // TODO(b/137554973): Consume frame payload.
     DCHECK(!payload.empty());
-    CloseConnectionOnWrongFrame("Push Promise");
-    return false;
+    if (!VersionUsesQpack(stream_->transport_version())) {
+      CloseConnectionOnWrongFrame("Push Promise");
+      return false;
+    }
+    return stream_->OnPushPromiseFramePayload(payload);
   }
 
   bool OnPushPromiseFrameEnd() override {
-    CloseConnectionOnWrongFrame("Push Promise");
-    return false;
+    if (!VersionUsesQpack(stream_->transport_version())) {
+      CloseConnectionOnWrongFrame("Push Promise");
+      return false;
+    }
+    return stream_->OnPushPromiseFrameEnd();
   }
 
   bool OnUnknownFrameStart(uint64_t /* frame_type */,
@@ -342,6 +351,35 @@
   return bytes_written;
 }
 
+void QuicSpdyStream::WritePushPromise(const PushPromiseFrame& frame) {
+  DCHECK(VersionUsesQpack(transport_version()));
+  std::unique_ptr<char[]> push_promise_frame_with_id;
+  const size_t push_promise_frame_length =
+      encoder_.SerializePushPromiseFrameWithOnlyPushId(
+          frame, &push_promise_frame_with_id);
+
+  unacked_frame_headers_offsets_.Add(send_buffer().stream_offset(),
+                                     send_buffer().stream_offset() +
+                                         push_promise_frame_length +
+                                         frame.headers.length());
+
+  // Write Push Promise frame header and push id.
+  QUIC_DLOG(INFO) << "Stream " << id()
+                  << " is writing Push Promise frame header of length "
+                  << push_promise_frame_length << " , with promised id "
+                  << frame.push_id;
+  WriteOrBufferData(QuicStringPiece(push_promise_frame_with_id.get(),
+                                    push_promise_frame_length),
+                    /* fin = */ false, /* ack_listener = */ nullptr);
+
+  // Write response headers.
+  QUIC_DLOG(INFO) << "Stream " << id()
+                  << " is writing Push Promise request header of length "
+                  << frame.headers.length();
+  WriteOrBufferData(frame.headers, /* fin = */ false,
+                    /* ack_listener = */ nullptr);
+}
+
 QuicConsumedData QuicSpdyStream::WritevBody(const struct iovec* iov,
                                             int count,
                                             bool fin) {
@@ -898,11 +936,48 @@
   return !sequencer()->IsClosed() && !reading_stopped();
 }
 
+bool QuicSpdyStream::OnPushPromiseFrameStart(PushId push_id,
+                                             Http3FrameLengths frame_length,
+                                             QuicByteCount push_id_length) {
+  DCHECK(VersionHasStreamType(transport_version()));
+  DCHECK(!qpack_decoded_headers_accumulator_);
+
+  // TODO(renjietang): Check max push id and handle errors.
+  spdy_session_->OnPushPromise(id(), push_id);
+  sequencer()->MarkConsumed(
+      body_buffer_.OnNonBody(frame_length.header_length + push_id_length));
+
+  qpack_decoded_headers_accumulator_ =
+      QuicMakeUnique<QpackDecodedHeadersAccumulator>(
+          id(), spdy_session_->qpack_decoder(), this,
+          spdy_session_->max_inbound_header_list_size());
+
+  return true;
+}
+
+bool QuicSpdyStream::OnPushPromiseFramePayload(QuicStringPiece payload) {
+  spdy_session_->OnCompressedFrameSize(payload.length());
+  return OnHeadersFramePayload(payload);
+}
+
+bool QuicSpdyStream::OnPushPromiseFrameEnd() {
+  DCHECK(VersionUsesQpack(transport_version()));
+
+  OnHeadersFrameEnd();
+  return !sequencer()->IsClosed() && !reading_stopped();
+}
+
 void QuicSpdyStream::ProcessDecodedHeaders(const QuicHeaderList& headers) {
-  const QuicByteCount frame_length = headers_decompressed_
-                                         ? trailers_payload_length_
-                                         : headers_payload_length_;
-  OnStreamHeaderList(/* fin = */ false, frame_length, headers);
+  if (spdy_session_->promised_stream_id() ==
+      QuicUtils::GetInvalidStreamId(
+          session()->connection()->transport_version())) {
+    const QuicByteCount frame_length = headers_decompressed_
+                                           ? trailers_payload_length_
+                                           : headers_payload_length_;
+    OnStreamHeaderList(/* fin = */ false, frame_length, headers);
+  } else {
+    spdy_session_->OnHeaderList(headers);
+  }
   qpack_decoded_headers_accumulator_.reset();
 }