Fix an xSan error in //third_party/http2/adapter:oghttp2_session_test.

The error occurs as a use-after-free because a reference to stream state is
still used even after the stream is closed.

The solution is to break and avoid accessing the closed-stream state.

Tested:
blaze test //third_party/http2/adapter:oghttp2_session_test --config=asan --test_filter=OgHttp2SessionTest.ServerQueuesTrailersWithResponse

Before: http://sponge2/86feee4d-3579-4517-8bd5-1a82ac98d783

After: http://sponge2/5cc1a3aa-6e7f-436d-b15a-c98c9737cb00
PiperOrigin-RevId: 399443519
diff --git a/http2/adapter/oghttp2_session.cc b/http2/adapter/oghttp2_session.cc
index e09ac21..4359bbf 100644
--- a/http2/adapter/oghttp2_session.cc
+++ b/http2/adapter/oghttp2_session.cc
@@ -399,6 +399,7 @@
     } else if (length == DataFrameSource::kError) {
       source_can_produce = false;
       CloseStream(stream_id, Http2ErrorCode::INTERNAL_ERROR);
+      // No more work on the stream; it has been closed.
       break;
     }
     const bool fin = end_data ? state.outbound_body->send_fin() : false;
@@ -435,14 +436,17 @@
       }
       state.outbound_body = nullptr;
       if (fin || sent_trailers) {
-        MaybeCloseWithRstStream(stream_id, state);
+        if (MaybeCloseWithRstStream(stream_id, state)) {
+          // No more work on the stream; it has been closed.
+          break;
+        }
       }
     }
   }
-  // If the stream still has data to send, it should be marked as ready in the
-  // write scheduler.
-  if (source_can_produce && state.send_window > 0 &&
-      state.outbound_body != nullptr) {
+  // If the stream still exists and has data to send, it should be marked as
+  // ready in the write scheduler.
+  if (stream_map_.contains(stream_id) && source_can_produce &&
+      state.send_window > 0 && state.outbound_body != nullptr) {
     write_scheduler_.MarkStreamReady(stream_id, false);
   }
   // Streams can continue writing as long as the connection is not write-blocked
@@ -842,20 +846,22 @@
   EnqueueFrame(std::move(frame));
 }
 
-void OgHttp2Session::MaybeCloseWithRstStream(Http2StreamId stream_id,
+bool OgHttp2Session::MaybeCloseWithRstStream(Http2StreamId stream_id,
                                              StreamState& state) {
   state.half_closed_local = true;
   if (options_.perspective == Perspective::kServer) {
     if (state.half_closed_remote) {
       CloseStream(stream_id, Http2ErrorCode::NO_ERROR);
+      return true;
     } else {
       // Since the peer has not yet ended the stream, this endpoint should
       // send a RST_STREAM NO_ERROR. See RFC 7540 Section 8.1.
       EnqueueFrame(absl::make_unique<spdy::SpdyRstStreamIR>(
           stream_id, spdy::SpdyErrorCode::ERROR_CODE_NO_ERROR));
-      // Enqueuing the RST_STREAM also invokes OnCloseStream.
+      // Sending the RST_STREAM also invokes OnCloseStream.
     }
   }
+  return false;
 }
 
 void OgHttp2Session::MarkDataBuffered(Http2StreamId stream_id, size_t bytes) {
diff --git a/http2/adapter/oghttp2_session.h b/http2/adapter/oghttp2_session.h
index 56f0621..916ef08 100644
--- a/http2/adapter/oghttp2_session.h
+++ b/http2/adapter/oghttp2_session.h
@@ -235,8 +235,8 @@
   void SendTrailers(Http2StreamId stream_id, spdy::SpdyHeaderBlock trailers);
 
   // Encapsulates the RST_STREAM NO_ERROR behavior described in RFC 7540
-  // Section 8.1.
-  void MaybeCloseWithRstStream(Http2StreamId stream_id, StreamState& state);
+  // Section 8.1. Returns true if the stream is closed.
+  bool MaybeCloseWithRstStream(Http2StreamId stream_id, StreamState& state);
 
   // Performs flow control accounting for data sent by the peer.
   void MarkDataBuffered(Http2StreamId stream_id, size_t bytes);