| commit 2e96fbf1e851242f8028af2cbc16dbc96e1037ff |
| Author: Eric Anderson <ejona@google.com> |
| Date: Tue Jul 15 15:00:24 2025 -0700 |
| |
| netty: Associate netty stream eagerly to avoid client hang |
| |
| In #12185, RPCs were randomly hanging. In #12207 this was tracked down |
| to the headers promise completing successfully, but the netty stream |
| was null. This was because the headers write hadn't completed but |
| stream.close() had been called by goingAway(). |
| |
| diff --git a/netty/src/main/java/io/grpc/netty/NettyClientHandler.java b/netty/src/main/java/io/grpc/netty/NettyClientHandler.java |
| index 276fa623c..d6bb37904 100644 |
| --- a/netty/src/main/java/io/grpc/netty/NettyClientHandler.java |
| +++ b/netty/src/main/java/io/grpc/netty/NettyClientHandler.java |
| @@ -773,6 +773,19 @@ class NettyClientHandler extends AbstractNettyHandler { |
| } |
| } |
| }); |
| + // When the HEADERS are not buffered because of MAX_CONCURRENT_STREAMS in |
| + // StreamBufferingEncoder, the stream is created immediately even if the bytes of the HEADERS |
| + // are delayed because the OS may have too much buffered and isn't accepting the write. The |
| + // write promise is also delayed until flush(). However, we need to associate the netty stream |
| + // with the transport state so that goingAway() and forcefulClose() and able to notify the |
| + // stream of failures. |
| + // |
| + // This leaves a hole when MAX_CONCURRENT_STREAMS is reached, as http2Stream will be null, but |
| + // it is better than nothing. |
| + Http2Stream http2Stream = connection().stream(streamId); |
| + if (http2Stream != null) { |
| + http2Stream.setProperty(streamKey, stream); |
| + } |
| } |
| |
| /** |
| diff --git a/netty/src/test/java/io/grpc/netty/NettyClientHandlerTest.java b/netty/src/test/java/io/grpc/netty/NettyClientHandlerTest.java |
| index dd4fcb4ea..5a2605eea 100644 |
| --- a/netty/src/test/java/io/grpc/netty/NettyClientHandlerTest.java |
| +++ b/netty/src/test/java/io/grpc/netty/NettyClientHandlerTest.java |
| @@ -453,6 +453,26 @@ public class NettyClientHandlerTest extends NettyHandlerTestBase<NettyClientHand |
| assertTrue(future.isDone()); |
| } |
| |
| + @Test |
| + public void receivedAbruptGoAwayShouldFailRacingQueuedIoStreamid() throws Exception { |
| + // Purposefully avoid flush(), since we want the write to not actually complete. |
| + // EmbeddedChannel doesn't support flow control, so this is the next closest approximation. |
| + ChannelFuture future = channel().write( |
| + newCreateStreamCommand(grpcHeaders, streamTransportState)); |
| + // Read a GOAWAY that indicates our stream can't be sent |
| + channelRead(goAwayFrame(0, 0 /* NO_ERROR */, Unpooled.copiedBuffer("this is a test", UTF_8))); |
| + |
| + ArgumentCaptor<Status> captor = ArgumentCaptor.forClass(Status.class); |
| + verify(streamListener).closed(captor.capture(), same(REFUSED), |
| + ArgumentMatchers.<Metadata>notNull()); |
| + assertEquals(Status.UNAVAILABLE.getCode(), captor.getValue().getCode()); |
| + assertEquals( |
| + "Abrupt GOAWAY closed sent stream. HTTP/2 error code: NO_ERROR, " |
| + + "debug data: this is a test", |
| + captor.getValue().getDescription()); |
| + assertTrue(future.isDone()); |
| + } |
| + |
| @Test |
| public void receivedGoAway_shouldFailBufferedStreamsExceedingMaxConcurrentStreams() |
| throws Exception { |