Bugfix: Split large RemoteOutputService gRPC message (https://github.com/bazelbuild/bazel/pull/28758)

### Description
Directories with too many output files creates unlimited gRPC messages for the `RemoteOutputService`. Limit the messages to `maxOutboundMessageSize`.

### Motivation
Large directory outputs from actions in combination with Remote Output Service gives:
```
ERROR: /code/.../BUILD.bazel:3:16: Action large_directory_ failed: (Exit 34): RESOURCE_EXHAUSTED: grpc: received message larger than max (18681338 vs. 4194304)
Target //:large_directory failed to build
```
Example `rules.bzl`:
```
def _large_directory_impl(ctx):
    output_dir = ctx.actions.declare_directory(ctx.label.name + "_")
    args = ctx.actions.args()
    args.add_all([output_dir], expand_directories = False)

    ctx.actions.run_shell(
        outputs = [output_dir],
        command = """
cd $1
for i in $(seq 1 100); do
    mkdir $i
    for j in $(seq 1 1000); do
        echo $i-$j > $i/$j
    done
done
""",
        arguments = [args],
        execution_requirements = {"supports-path-mapping": "1"},
    )
    return DefaultInfo(files = depset([output_dir]))

large_directory = rule(
    implementation = _large_directory_impl,
)
```
Example `BUILD.bazel`:
```
load("//:rules.bzl", "large_directory")

large_directory(
    name = "large_directory",
)
```

RELNOTES: None

Closes #28758.
Closes #24705.

PiperOrigin-RevId: 925246636
Change-Id: I1c882a73fb2dc11313de7dd17b5a8d9179736eac
diff --git a/src/main/java/com/google/devtools/build/lib/remote/BazelOutputService.java b/src/main/java/com/google/devtools/build/lib/remote/BazelOutputService.java
index 0698c36..33de787 100644
--- a/src/main/java/com/google/devtools/build/lib/remote/BazelOutputService.java
+++ b/src/main/java/com/google/devtools/build/lib/remote/BazelOutputService.java
@@ -82,6 +82,9 @@
   private final String remoteCache;
   private final String remoteInstanceName;
   private final String remoteOutputServiceOutputPathPrefix;
+  private final int maxOutboundMessageSize;
+  private final int stageArtifactsRequestArtifactWithoutPathSize;
+  private final int finalizeArtifactsRequestArtifactWithoutPathSize;
   private final boolean verboseFailures;
   private final RemoteRetrier retrier;
   private final ReferenceCountedChannel channel;
@@ -94,10 +97,11 @@
       Path outputBase,
       Supplier<Path> execRootSupplier,
       Supplier<Path> outputPathSupplier,
-      DigestFunction.Value digestFunction,
+      DigestUtil digestUtil,
       String remoteCache,
       String remoteInstanceName,
       String remoteOutputServiceOutputPathPrefix,
+      int maxOutboundMessageSize,
       boolean verboseFailures,
       RemoteRetrier retrier,
       ReferenceCountedChannel channel,
@@ -105,10 +109,15 @@
     this.outputBaseId = DigestUtil.hashCodeToString(md5().hashString(outputBase.toString(), UTF_8));
     this.execRootSupplier = execRootSupplier;
     this.outputPathSupplier = outputPathSupplier;
-    this.digestFunction = digestFunction;
+    this.digestFunction = digestUtil.getDigestFunction();
     this.remoteCache = remoteCache;
     this.remoteInstanceName = remoteInstanceName;
     this.remoteOutputServiceOutputPathPrefix = remoteOutputServiceOutputPathPrefix;
+    this.maxOutboundMessageSize = maxOutboundMessageSize;
+    this.stageArtifactsRequestArtifactWithoutPathSize =
+        computeStageArtifactsRequestArtifactWithoutPathSize(digestUtil);
+    this.finalizeArtifactsRequestArtifactWithoutPathSize =
+        computeFinalizeArtifactsRequestArtifactWithoutPathSize(digestUtil);
     this.verboseFailures = verboseFailures;
     this.retrier = retrier;
     this.channel = channel;
@@ -277,33 +286,71 @@
     var outputPath = outputPathSupplier.get();
     var request = StageArtifactsRequest.newBuilder();
     request.setBuildId(buildId);
-    for (var file : files) {
-      request.addArtifacts(
-          StageArtifactsRequest.Artifact.newBuilder()
-              .setPath(file.path().relativeTo(outputPath).toString())
-              .setLocator(
-                  Any.pack(FileArtifactLocator.newBuilder().setDigest(file.digest()).build()))
-              .build());
-    }
-    var response = stageArtifacts(request.build());
-    if (response.getResponsesCount() != files.size()) {
-      throw new IOException(
-          String.format(
-              "StageArtifacts failed: expect %s responses from StageArtifactsResponse, got %s",
-              files.size(), response.getResponsesCount()));
-    }
+    final int initialRequestSize = request.build().getSerializedSize();
 
-    for (var i = 0; i < files.size(); ++i) {
-      var fileResponse = response.getResponses(i);
-      if (fileResponse.getStatus().getCode() != Status.Code.OK.value()) {
+    // Split into batches to avoid exceeding gRPC message size limit.
+    while (!files.isEmpty()) {
+      request.clearArtifacts();
+      int requestSize = initialRequestSize;
+      int endIdx;
+      for (endIdx = 0; endIdx < files.size(); ++endIdx) {
+        var file = files.get(endIdx);
+        var path = file.path().relativeTo(outputPath).toString();
+        requestSize += stageArtifactsRequestArtifactWithoutPathSize + path.length();
+        if (endIdx > 0 && requestSize > maxOutboundMessageSize) {
+          break;
+        }
+        request.addArtifacts(
+            StageArtifactsRequest.Artifact.newBuilder()
+                .setPath(path)
+                .setLocator(
+                    Any.pack(FileArtifactLocator.newBuilder().setDigest(file.digest()).build()))
+                .build());
+      }
+      // Send this part of the list to avoid too big gRPC messages.
+      var filesInRequest = files.subList(0, endIdx);
+      files = files.subList(endIdx, files.size());
+
+      var response = stageArtifacts(request.build());
+      if (response.getResponsesCount() != filesInRequest.size()) {
         throw new IOException(
             String.format(
-                "Failed to stage %s, code: %s",
-                files.get(i).path().relativeTo(outputPath), fileResponse.getStatus()));
+                "StageArtifacts failed: expect %s responses from StageArtifactsResponse, got %s",
+                filesInRequest.size(), response.getResponsesCount()));
+      }
+
+      for (var i = 0; i < filesInRequest.size(); ++i) {
+        var fileResponse = response.getResponses(i);
+        if (fileResponse.getStatus().getCode() != Status.Code.OK.value()) {
+          throw new IOException(
+              String.format(
+                  "Failed to stage %s, code: %s",
+                  filesInRequest.get(i).path().relativeTo(outputPath), fileResponse.getStatus()));
+        }
       }
     }
   }
 
+  private static int computeStageArtifactsRequestArtifactWithoutPathSize(DigestUtil digestUtil) {
+    // We assume all non-empty digests have the same size. This is true for fixed-length hashes.
+    // To not underestimate, add a small overhead.
+    // The overhead of 7 bytes accounts for Protobuf serialization per artifact in the repeated
+    // field: 1 byte for the repeated field tag, 1-2 bytes for the nested message length prefix,
+    // and a few bytes buffer for varint length prefix expansions of other fields (e.g. path).
+    final int overhead = 7;
+    final int stageArtifactsRequestArtifactWithoutPathSize =
+        StageArtifactsRequest.Artifact.newBuilder()
+            .setPath("p")
+            .setLocator(
+                Any.pack(
+                    FileArtifactLocator.newBuilder()
+                        .setDigest(digestUtil.compute(new byte[] {1}))
+                        .build()))
+            .build()
+            .getSerializedSize();
+    return overhead + stageArtifactsRequestArtifactWithoutPathSize;
+  }
+
   private StageArtifactsResponse stageArtifacts(StageArtifactsRequest request)
       throws IOException, InterruptedException {
     return retrier.execute(
@@ -358,32 +405,89 @@
                 }));
   }
 
+  private static int computeFinalizeArtifactsRequestArtifactWithoutPathSize(DigestUtil digestUtil) {
+    // We assume all non-empty digests have the same size. This is true for fixed-length hashes.
+    // To not underestimate, add a small overhead.
+    // The overhead of 7 bytes accounts for Protobuf serialization per artifact in the repeated
+    // field: 1 byte for the repeated field tag, 1-2 bytes for the nested message length prefix,
+    // and a few bytes buffer for varint length prefix expansions of other fields (e.g. path).
+    final int overhead = 7;
+    final int finalizeArtifactsRequestArtifactWithoutPathSize =
+        FinalizeArtifactsRequest.Artifact.newBuilder()
+            .setPath("p")
+            .setLocator(
+                Any.pack(
+                    FileArtifactLocator.newBuilder()
+                        .setDigest(digestUtil.compute(new byte[] {1}))
+                        .build()))
+            .build()
+            .getSerializedSize();
+    return overhead + finalizeArtifactsRequestArtifactWithoutPathSize;
+  }
+
   @Override
   public void finalizeAction(Action action, OutputMetadataStore outputMetadataStore)
       throws IOException, InterruptedException {
     var execRoot = execRootSupplier.get();
     var outputPath = outputPathSupplier.get();
 
-    var request = FinalizeArtifactsRequest.newBuilder();
-    request.setBuildId(buildId);
+    // Send a partial lists to avoid too large messages.
+    class RequestSizeLimitingActionFinalizer {
+      final FinalizeArtifactsRequest.Builder builder;
+      int requestSize;
+      final int initialRequestSize;
+
+      RequestSizeLimitingActionFinalizer() {
+        builder = FinalizeArtifactsRequest.newBuilder();
+        builder.setBuildId(buildId);
+        initialRequestSize = builder.build().getSerializedSize();
+        requestSize = initialRequestSize;
+      }
+
+      void addArtifact(Artifact output) throws IOException, InterruptedException {
+        checkState(!output.isTreeArtifact());
+        var metadata = outputMetadataStore.getOutputMetadata(output);
+        if (metadata.getType().isFile()) {
+          var digest = DigestUtil.buildDigest(metadata.getDigest(), metadata.getSize());
+          var path = execRoot.getRelative(output.getExecPath()).relativeTo(outputPath).toString();
+          final int entrySize = finalizeArtifactsRequestArtifactWithoutPathSize + path.length();
+          if (builder.getArtifactsCount() > 0 && requestSize + entrySize > maxOutboundMessageSize) {
+            // Send a partial list to avoid too large messages.
+            sendPendingRequest();
+          }
+          requestSize += entrySize;
+          builder.addArtifacts(
+              FinalizeArtifactsRequest.Artifact.newBuilder()
+                  .setPath(path)
+                  .setLocator(Any.pack(FileArtifactLocator.newBuilder().setDigest(digest).build()))
+                  .build());
+        }
+      }
+
+      void sendPendingRequest() throws IOException, InterruptedException {
+        var unused = finalizeArtifacts(builder.build());
+        builder.clearArtifacts();
+        requestSize = initialRequestSize;
+      }
+    }
+
+    var request = new RequestSizeLimitingActionFinalizer();
     for (var output : action.getOutputs()) {
       if (outputMetadataStore.artifactOmitted(output)) {
         continue;
       }
-
       if (output.isTreeArtifact()) {
         // TODO(chiwang): Use TreeArtifactLocator
         var children =
             outputMetadataStore.getTreeArtifactValue((SpecialArtifact) output).getChildren();
         for (var child : children) {
-          addArtifact(outputMetadataStore, execRoot, outputPath, request, child);
+          request.addArtifact(child);
         }
       } else {
-        addArtifact(outputMetadataStore, execRoot, outputPath, request, output);
+        request.addArtifact(output);
       }
     }
-
-    var unused = finalizeArtifacts(request.build());
+    request.sendPendingRequest();
   }
 
   private FinalizeArtifactsResponse finalizeArtifacts(FinalizeArtifactsRequest request)
@@ -402,26 +506,6 @@
                 }));
   }
 
-  private static void addArtifact(
-      OutputMetadataStore outputMetadataStore,
-      Path execRoot,
-      Path outputPath,
-      FinalizeArtifactsRequest.Builder builder,
-      Artifact output)
-      throws IOException, InterruptedException {
-    checkState(!output.isTreeArtifact());
-    var metadata = outputMetadataStore.getOutputMetadata(output);
-    if (metadata.getType().isFile()) {
-      var digest = DigestUtil.buildDigest(metadata.getDigest(), metadata.getSize());
-      var path = execRoot.getRelative(output.getExecPath()).relativeTo(outputPath).toString();
-      builder.addArtifacts(
-          FinalizeArtifactsRequest.Artifact.newBuilder()
-              .setPath(path)
-              .setLocator(Any.pack(FileArtifactLocator.newBuilder().setDigest(digest).build()))
-              .build());
-    }
-  }
-
   private record BazelOutputServiceFile(Digest digest) implements FileStatusWithDigest {
     @Override
     public boolean isFile() {
diff --git a/src/main/java/com/google/devtools/build/lib/remote/RemoteModule.java b/src/main/java/com/google/devtools/build/lib/remote/RemoteModule.java
index f374fa3..15a49ef 100644
--- a/src/main/java/com/google/devtools/build/lib/remote/RemoteModule.java
+++ b/src/main/java/com/google/devtools/build/lib/remote/RemoteModule.java
@@ -605,10 +605,11 @@
               env.getOutputBase(),
               env::getExecRoot,
               () -> env.getDirectories().getOutputPath(env.getWorkspaceName()),
-              digestUtil.getDigestFunction(),
+              digestUtil,
               remoteOptions.getRemoteCache(),
               remoteOptions.getRemoteInstanceName(),
               remoteOptions.getRemoteOutputServiceOutputPathPrefix(),
+              remoteOptions.getMaxOutboundMessageSize(),
               verboseFailures,
               retrier,
               bazelOutputServiceChannel,
diff --git a/src/test/java/com/google/devtools/build/lib/remote/BUILD b/src/test/java/com/google/devtools/build/lib/remote/BUILD
index 86fc9e6..42ffe31 100644
--- a/src/test/java/com/google/devtools/build/lib/remote/BUILD
+++ b/src/test/java/com/google/devtools/build/lib/remote/BUILD
@@ -183,6 +183,8 @@
         "//src/main/java/com/google/devtools/build/lib/vfs/bazel",
         "//src/main/java/com/google/devtools/build/lib/vfs/inmemoryfs",
         "//src/main/java/com/google/devtools/common/options",
+        "//src/main/protobuf:bazel_output_service_java_grpc",
+        "//src/main/protobuf:bazel_output_service_java_proto",
         "//src/main/protobuf:cache_salt_java_proto",
         "//src/main/protobuf:failure_details_java_proto",
         "//src/main/protobuf:invocation_policy_java_proto",
diff --git a/src/test/java/com/google/devtools/build/lib/remote/BazelOutputServiceTest.java b/src/test/java/com/google/devtools/build/lib/remote/BazelOutputServiceTest.java
new file mode 100644
index 0000000..f475c9d
--- /dev/null
+++ b/src/test/java/com/google/devtools/build/lib/remote/BazelOutputServiceTest.java
@@ -0,0 +1,315 @@
+// Copyright 2026 The Bazel Authors. All rights reserved.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//    http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+package com.google.devtools.build.lib.remote;
+
+import static com.google.common.truth.Truth.assertThat;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import com.google.devtools.build.lib.actions.Action;
+import com.google.devtools.build.lib.actions.Artifact;
+import com.google.devtools.build.lib.actions.ArtifactRoot;
+import com.google.devtools.build.lib.actions.FileArtifactValue;
+import com.google.devtools.build.lib.actions.FileStateType;
+import com.google.devtools.build.lib.actions.cache.OutputMetadataStore;
+import com.google.devtools.build.lib.actions.util.ActionsTestUtil;
+import com.google.devtools.build.lib.events.EventHandler;
+import com.google.devtools.build.lib.remote.BazelOutputServiceProto.FinalizeArtifactsRequest;
+import com.google.devtools.build.lib.remote.BazelOutputServiceProto.FinalizeArtifactsResponse;
+import com.google.devtools.build.lib.remote.BazelOutputServiceProto.StageArtifactsRequest;
+import com.google.devtools.build.lib.remote.BazelOutputServiceProto.StageArtifactsResponse;
+import com.google.devtools.build.lib.remote.BazelOutputServiceProto.StartBuildRequest;
+import com.google.devtools.build.lib.remote.BazelOutputServiceProto.StartBuildResponse;
+import com.google.devtools.build.lib.remote.RemoteExecutionService.ActionResultMetadata.FileMetadata;
+import com.google.devtools.build.lib.remote.util.DigestUtil;
+import com.google.devtools.build.lib.vfs.DigestHashFunction;
+import com.google.devtools.build.lib.vfs.FileSystem;
+import com.google.devtools.build.lib.vfs.Path;
+import com.google.devtools.build.lib.vfs.SyscallCache;
+import com.google.devtools.build.lib.vfs.inmemoryfs.InMemoryFileSystem;
+import com.google.rpc.Status;
+import io.grpc.Channel;
+import io.grpc.Server;
+import io.grpc.inprocess.InProcessChannelBuilder;
+import io.grpc.inprocess.InProcessServerBuilder;
+import io.grpc.stub.StreamObserver;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.UUID;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+@RunWith(JUnit4.class)
+public class BazelOutputServiceTest {
+
+  private static final String SERVER_NAME = "fake-server";
+
+  private static class FakeBazelOutputService
+      extends BazelOutputServiceGrpc.BazelOutputServiceImplBase {
+    private final List<StageArtifactsRequest> stageRequests = new ArrayList<>();
+    private final List<FinalizeArtifactsRequest> finalizeRequests = new ArrayList<>();
+
+    @Override
+    public void startBuild(
+        StartBuildRequest request, StreamObserver<StartBuildResponse> responseObserver) {
+      responseObserver.onNext(
+          StartBuildResponse.newBuilder().setOutputPathSuffix("suffix").build());
+      responseObserver.onCompleted();
+    }
+
+    @Override
+    public void stageArtifacts(
+        StageArtifactsRequest request, StreamObserver<StageArtifactsResponse> responseObserver) {
+      stageRequests.add(request);
+      StageArtifactsResponse.Builder response = StageArtifactsResponse.newBuilder();
+      for (int i = 0; i < request.getArtifactsCount(); i++) {
+        response.addResponses(
+            StageArtifactsResponse.Response.newBuilder()
+                .setStatus(Status.newBuilder().setCode(0).build()) // OK
+                .build());
+      }
+      responseObserver.onNext(response.build());
+      responseObserver.onCompleted();
+    }
+
+    @Override
+    public void finalizeArtifacts(
+        FinalizeArtifactsRequest request,
+        StreamObserver<FinalizeArtifactsResponse> responseObserver) {
+      finalizeRequests.add(request);
+      responseObserver.onNext(FinalizeArtifactsResponse.getDefaultInstance());
+      responseObserver.onCompleted();
+    }
+
+    List<StageArtifactsRequest> getStageRequests() {
+      return stageRequests;
+    }
+
+    List<FinalizeArtifactsRequest> getFinalizeRequests() {
+      return finalizeRequests;
+    }
+  }
+
+  private final FakeBazelOutputService fakeService = new FakeBazelOutputService();
+  private Server server;
+  private Channel inProcessChannel;
+  private FileSystem fileSystem;
+  private Path outputBase;
+  private Path execRoot;
+  private Path outputPath;
+  private DigestUtil digestUtil;
+  private ArtifactRoot artifactRoot;
+
+  @Before
+  public void setUp() throws Exception {
+    server =
+        InProcessServerBuilder.forName(SERVER_NAME)
+            .addService(fakeService)
+            .directExecutor()
+            .build()
+            .start();
+
+    inProcessChannel = InProcessChannelBuilder.forName(SERVER_NAME).directExecutor().build();
+    fileSystem = new InMemoryFileSystem(DigestHashFunction.SHA256);
+    outputBase = fileSystem.getPath("/output_base");
+    execRoot = fileSystem.getPath("/workspace");
+    outputPath = execRoot.getRelative("outputs");
+    digestUtil = new DigestUtil(SyscallCache.NO_CACHE, DigestHashFunction.SHA256);
+    artifactRoot = ArtifactRoot.asDerivedRoot(execRoot, ArtifactRoot.RootType.OUTPUT, "outputs");
+    artifactRoot.getRoot().asPath().createDirectoryAndParents();
+  }
+
+  @After
+  public void tearDown() throws Exception {
+    inProcessChannel = null;
+    server.shutdownNow();
+    server.awaitTermination();
+  }
+
+  private BazelOutputService createService(int maxOutboundMessageSize) throws Exception {
+    ReferenceCountedChannel channel = mock(ReferenceCountedChannel.class);
+    when(channel.withChannelBlocking(any()))
+        .thenAnswer(
+            invocation -> {
+              ReferenceCountedChannel.IOFunction<Channel, ?> formula = invocation.getArgument(0);
+              return formula.apply(inProcessChannel);
+            });
+
+    RemoteRetrier retrier = mock(RemoteRetrier.class);
+    when(retrier.execute(any()))
+        .thenAnswer(
+            invocation -> {
+              Retrier.RetryableCallable<?, ?> callable = invocation.getArgument(0);
+              return callable.call();
+            });
+
+    BazelOutputService service =
+        new BazelOutputService(
+            outputBase,
+            () -> execRoot,
+            () -> outputPath,
+            digestUtil,
+            "cache",
+            "instance",
+            "/prefix",
+            maxOutboundMessageSize,
+            /* verboseFailures= */ false,
+            retrier,
+            channel,
+            /* lastBuildId= */ null);
+    var unused =
+        service.startBuild(
+            UUID.randomUUID(), "workspace", mock(EventHandler.class), /* finalizeActions= */ true);
+    return service;
+  }
+
+  @Test
+  public void stageArtifacts_smallMessage_singleBatch() throws Exception {
+    BazelOutputService service = createService(10000); // large limit
+    List<FileMetadata> files = new ArrayList<>();
+    for (int i = 0; i < 5; i++) {
+      FileMetadata file = mock(FileMetadata.class);
+      when(file.path()).thenReturn(outputPath.getRelative("file" + i));
+      when(file.digest()).thenReturn(digestUtil.compute(new byte[] {(byte) i}));
+      files.add(file);
+    }
+
+    service.stageArtifacts(files);
+
+    assertThat(fakeService.getStageRequests()).hasSize(1);
+    assertThat(fakeService.getStageRequests().get(0).getArtifactsCount()).isEqualTo(5);
+  }
+
+  @Test
+  public void stageArtifacts_largeMessage_multipleBatches() throws Exception {
+    // Set a small limit to force splitting.
+    // We estimate size as: base_size + (entry_size + path_length) * N
+    // Entry size without path is estimated with some overhead, let's say it is ~100 bytes.
+    // If we set limit to 300, and have paths of length ~10, we should fit about 2-3 artifacts per
+    // batch.
+    BazelOutputService service = createService(300);
+    List<FileMetadata> files = new ArrayList<>();
+    for (int i = 0; i < 10; i++) {
+      FileMetadata file = mock(FileMetadata.class);
+      // Use a relatively long path to increase size
+      when(file.path()).thenReturn(outputPath.getRelative("very/long/path/to/file/number/" + i));
+      when(file.digest()).thenReturn(digestUtil.compute(new byte[] {(byte) i}));
+      files.add(file);
+    }
+
+    service.stageArtifacts(files);
+
+    // Verify that we split into multiple requests
+    assertThat(fakeService.getStageRequests().size()).isGreaterThan(1);
+    int totalStaged = 0;
+    for (StageArtifactsRequest request : fakeService.getStageRequests()) {
+      totalStaged += request.getArtifactsCount();
+      // Verify each request is within limit (approximately, using our estimation)
+      // Protobuf serialized size should be within limit.
+      assertThat(request.getSerializedSize()).isAtMost(300);
+    }
+    assertThat(totalStaged).isEqualTo(10);
+  }
+
+  @Test
+  public void finalizeArtifacts_smallMessage_singleBatch() throws Exception {
+    BazelOutputService service = createService(10000); // large limit
+    Action action = mock(Action.class);
+    List<Artifact> outputs = new ArrayList<>();
+    OutputMetadataStore outputMetadataStore = mock(OutputMetadataStore.class);
+
+    for (int i = 0; i < 5; i++) {
+      Artifact artifact = ActionsTestUtil.createArtifact(artifactRoot, "out" + i);
+      outputs.add(artifact);
+
+      FileArtifactValue metadata = mock(FileArtifactValue.class);
+      when(metadata.getType()).thenReturn(FileStateType.REGULAR_FILE);
+      when(metadata.getDigest()).thenReturn(new byte[] {(byte) i});
+      when(metadata.getSize()).thenReturn(100L);
+      when(outputMetadataStore.getOutputMetadata(artifact)).thenReturn(metadata);
+    }
+    when(action.getOutputs()).thenReturn(outputs);
+
+    service.finalizeAction(action, outputMetadataStore);
+
+    assertThat(fakeService.getFinalizeRequests()).hasSize(1);
+    assertThat(fakeService.getFinalizeRequests().get(0).getArtifactsCount()).isEqualTo(5);
+  }
+
+  @Test
+  public void finalizeArtifacts_largeMessage_multipleBatches() throws Exception {
+    BazelOutputService service = createService(300); // small limit
+    Action action = mock(Action.class);
+    List<Artifact> outputs = new ArrayList<>();
+    OutputMetadataStore outputMetadataStore = mock(OutputMetadataStore.class);
+
+    for (int i = 0; i < 10; i++) {
+      Artifact artifact =
+          ActionsTestUtil.createArtifact(artifactRoot, "very/long/path/to/output/file/number/" + i);
+      outputs.add(artifact);
+
+      FileArtifactValue metadata = mock(FileArtifactValue.class);
+      when(metadata.getType()).thenReturn(FileStateType.REGULAR_FILE);
+      when(metadata.getDigest()).thenReturn(new byte[] {(byte) i});
+      when(metadata.getSize()).thenReturn(100L);
+      when(outputMetadataStore.getOutputMetadata(artifact)).thenReturn(metadata);
+    }
+    when(action.getOutputs()).thenReturn(outputs);
+
+    service.finalizeAction(action, outputMetadataStore);
+
+    assertThat(fakeService.getFinalizeRequests().size()).isGreaterThan(1);
+    int totalFinalized = 0;
+    for (FinalizeArtifactsRequest request : fakeService.getFinalizeRequests()) {
+      totalFinalized += request.getArtifactsCount();
+      assertThat(request.getSerializedSize()).isAtMost(300);
+    }
+    assertThat(totalFinalized).isEqualTo(10);
+  }
+
+  @Test
+  public void finalizeArtifacts_giantArtifact_noEmptyRequest() throws Exception {
+    // Limit is small (150), but the base request size (with buildId) plus one giant artifact
+    // will definitely exceed it.
+    // We want to verify that we don't send an empty request first.
+    BazelOutputService service = createService(150);
+    Action action = mock(Action.class);
+    List<Artifact> outputs = new ArrayList<>();
+    OutputMetadataStore outputMetadataStore = mock(OutputMetadataStore.class);
+
+    // Add one giant artifact
+    Artifact artifact =
+        ActionsTestUtil.createArtifact(
+            artifactRoot, "a/very/" + "long/".repeat(20) + "path/to/giant/artifact");
+    outputs.add(artifact);
+
+    FileArtifactValue metadata = mock(FileArtifactValue.class);
+    when(metadata.getType()).thenReturn(FileStateType.REGULAR_FILE);
+    when(metadata.getDigest()).thenReturn(new byte[] {1, 2, 3});
+    when(metadata.getSize()).thenReturn(100L);
+    when(outputMetadataStore.getOutputMetadata(artifact)).thenReturn(metadata);
+
+    when(action.getOutputs()).thenReturn(outputs);
+
+    service.finalizeAction(action, outputMetadataStore);
+
+    // We expect exactly 1 request (the one containing the giant artifact).
+    assertThat(fakeService.getFinalizeRequests()).hasSize(1);
+    assertThat(fakeService.getFinalizeRequests().get(0).getArtifactsCount()).isEqualTo(1);
+  }
+}