blob: ba857c822c65428ec0f0431294cfa463f52d2a25 [file] [log] [blame]
buchgrdc240042017-07-08 12:47:58 +02001// Copyright 2017 The Bazel Authors. All rights reserved.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14package com.google.devtools.build.lib.remote;
15
Googler46104c62021-12-10 12:56:03 -080016import static com.google.common.base.Preconditions.checkState;
buchgrdc240042017-07-08 12:47:58 +020017import static com.google.common.truth.Truth.assertThat;
olaola6f32d5a2017-09-20 17:12:19 +020018import static java.nio.charset.StandardCharsets.UTF_8;
Chi Wangcb1f9f62020-09-17 23:37:19 -070019import static org.junit.Assert.assertThrows;
buchgrdc240042017-07-08 12:47:58 +020020import static org.junit.Assert.fail;
George Gensure51226172020-12-01 01:33:42 -080021import static org.mockito.ArgumentMatchers.any;
buchgrdc240042017-07-08 12:47:58 +020022
olaolaf0aa55d2018-08-16 08:51:06 -070023import build.bazel.remote.execution.v2.Digest;
24import build.bazel.remote.execution.v2.RequestMetadata;
Alessandro Patti6da80862021-11-11 22:49:37 -080025import com.github.luben.zstd.Zstd;
26import com.github.luben.zstd.ZstdInputStream;
buchgrdc240042017-07-08 12:47:58 +020027import com.google.bytestream.ByteStreamGrpc.ByteStreamImplBase;
George Gensure3c9089b2019-05-02 07:07:55 -070028import com.google.bytestream.ByteStreamProto.QueryWriteStatusRequest;
29import com.google.bytestream.ByteStreamProto.QueryWriteStatusResponse;
buchgrdc240042017-07-08 12:47:58 +020030import com.google.bytestream.ByteStreamProto.WriteRequest;
31import com.google.bytestream.ByteStreamProto.WriteResponse;
olaola6f32d5a2017-09-20 17:12:19 +020032import com.google.common.collect.ImmutableList;
Benjamin Petersonc26ec2d2022-06-15 02:27:25 -070033import com.google.common.collect.ImmutableMap;
George Gensure3c9089b2019-05-02 07:07:55 -070034import com.google.common.collect.Maps;
35import com.google.common.collect.Sets;
36import com.google.common.hash.HashCode;
buchgrdc240042017-07-08 12:47:58 +020037import com.google.common.util.concurrent.ListenableFuture;
38import com.google.common.util.concurrent.ListeningScheduledExecutorService;
39import com.google.common.util.concurrent.MoreExecutors;
olaola6f32d5a2017-09-20 17:12:19 +020040import com.google.devtools.build.lib.analysis.BlazeVersionInfo;
Chi Wangcb1f9f62020-09-17 23:37:19 -070041import com.google.devtools.build.lib.authandtls.CallCredentialsProvider;
Googler37ee2522021-01-28 22:54:20 -080042import com.google.devtools.build.lib.remote.common.RemoteActionExecutionContext;
Googlerde8f69d2021-02-26 07:07:48 +010043import com.google.devtools.build.lib.remote.grpc.ChannelConnectionFactory;
Googler922d1e62018-03-05 14:49:00 -080044import com.google.devtools.build.lib.remote.util.DigestUtil;
Jakob Buchgraberbc06db92019-03-07 00:21:52 -080045import com.google.devtools.build.lib.remote.util.TestUtils;
Googler922d1e62018-03-05 14:49:00 -080046import com.google.devtools.build.lib.remote.util.TracingMetadataUtils;
ccalvarinbda12a12018-06-21 18:57:26 -070047import com.google.devtools.build.lib.vfs.DigestHashFunction;
janakr491e4412022-01-28 15:35:34 -080048import com.google.devtools.build.lib.vfs.SyscallCache;
buchgrdc240042017-07-08 12:47:58 +020049import com.google.protobuf.ByteString;
olaola6f32d5a2017-09-20 17:12:19 +020050import io.grpc.BindableService;
Chi Wangcb1f9f62020-09-17 23:37:19 -070051import io.grpc.CallCredentials;
buchgrdc240042017-07-08 12:47:58 +020052import io.grpc.Metadata;
53import io.grpc.Server;
54import io.grpc.ServerCall;
buchgrdc240042017-07-08 12:47:58 +020055import io.grpc.ServerCallHandler;
Alessandro Patti52c87732020-03-03 04:18:12 -080056import io.grpc.ServerInterceptor;
olaola6f32d5a2017-09-20 17:12:19 +020057import io.grpc.ServerInterceptors;
buchgrdc240042017-07-08 12:47:58 +020058import io.grpc.Status;
59import io.grpc.Status.Code;
George Gensureb7d1d4b2019-01-09 11:56:51 -080060import io.grpc.StatusRuntimeException;
buchgrdc240042017-07-08 12:47:58 +020061import io.grpc.inprocess.InProcessChannelBuilder;
62import io.grpc.inprocess.InProcessServerBuilder;
Alessandro Patti52c87732020-03-03 04:18:12 -080063import io.grpc.stub.MetadataUtils;
buchgrdc240042017-07-08 12:47:58 +020064import io.grpc.stub.StreamObserver;
65import io.grpc.util.MutableHandlerRegistry;
Googlerde8f69d2021-02-26 07:07:48 +010066import io.reactivex.rxjava3.core.Single;
George Gensure3c9089b2019-05-02 07:07:55 -070067import java.io.ByteArrayInputStream;
Alessandro Patti6da80862021-11-11 22:49:37 -080068import java.io.ByteArrayOutputStream;
buchgrdc240042017-07-08 12:47:58 +020069import java.io.IOException;
George Gensure3c9089b2019-05-02 07:07:55 -070070import java.io.InputStream;
buchgrdc240042017-07-08 12:47:58 +020071import java.util.ArrayList;
Benjamin Petersondd57d412022-06-15 03:25:41 -070072import java.util.Arrays;
buchgrdc240042017-07-08 12:47:58 +020073import java.util.Collections;
buchgrdc240042017-07-08 12:47:58 +020074import java.util.List;
75import java.util.Map;
76import java.util.Random;
77import java.util.Set;
buchgrdc240042017-07-08 12:47:58 +020078import java.util.concurrent.Executors;
buchgrdc240042017-07-08 12:47:58 +020079import java.util.concurrent.RejectedExecutionException;
80import java.util.concurrent.TimeUnit;
81import java.util.concurrent.atomic.AtomicInteger;
Googler46104c62021-12-10 12:56:03 -080082import java.util.function.Supplier;
Chi Wangcb1f9f62020-09-17 23:37:19 -070083import javax.annotation.Nullable;
buchgrdc240042017-07-08 12:47:58 +020084import org.junit.After;
85import org.junit.Before;
86import org.junit.Test;
87import org.junit.runner.RunWith;
88import org.junit.runners.JUnit4;
89import org.mockito.Mock;
90import org.mockito.Mockito;
91import org.mockito.MockitoAnnotations;
92
Googler37ee2522021-01-28 22:54:20 -080093/** Tests for {@link ByteStreamUploader}. */
buchgrdc240042017-07-08 12:47:58 +020094@RunWith(JUnit4.class)
95public class ByteStreamUploaderTest {
96
janakr491e4412022-01-28 15:35:34 -080097 private static final DigestUtil DIGEST_UTIL =
98 new DigestUtil(SyscallCache.NO_CACHE, DigestHashFunction.SHA256);
buchgr559a07d2017-11-30 11:09:35 -080099
buchgrdc240042017-07-08 12:47:58 +0200100 private static final int CHUNK_SIZE = 10;
101 private static final String INSTANCE_NAME = "foo";
102
103 private final MutableHandlerRegistry serviceRegistry = new MutableHandlerRegistry();
michajlo0f99c3c2020-03-09 16:07:50 -0700104 private ListeningScheduledExecutorService retryService;
buchgrdc240042017-07-08 12:47:58 +0200105
Googlerde8f69d2021-02-26 07:07:48 +0100106 private final String serverName = "Server for " + this.getClass();
buchgrdc240042017-07-08 12:47:58 +0200107 private Server server;
Benjamin Peterson20c61df2022-07-25 01:37:10 -0700108 private ReferenceCountedChannel referenceCountedChannel;
Googler37ee2522021-01-28 22:54:20 -0800109 private RemoteActionExecutionContext context;
buchgrdc240042017-07-08 12:47:58 +0200110
buchgr44e40bc2017-12-04 10:44:47 -0800111 @Mock private Retrier.Backoff mockBackoff;
buchgrdc240042017-07-08 12:47:58 +0200112
113 @Before
olaola6f32d5a2017-09-20 17:12:19 +0200114 public final void setUp() throws Exception {
buchgrdc240042017-07-08 12:47:58 +0200115 MockitoAnnotations.initMocks(this);
116
Googler37ee2522021-01-28 22:54:20 -0800117 server =
118 InProcessServerBuilder.forName(serverName)
119 .fallbackHandlerRegistry(serviceRegistry)
120 .build()
121 .start();
Benjamin Peterson20c61df2022-07-25 01:37:10 -0700122 referenceCountedChannel =
123 new ReferenceCountedChannel(
124 new ChannelConnectionFactory() {
125 @Override
126 public Single<? extends ChannelConnection> create() {
127 return Single.just(
128 new ChannelConnection(InProcessChannelBuilder.forName(serverName).build()));
129 }
Googlerde8f69d2021-02-26 07:07:48 +0100130
Benjamin Peterson20c61df2022-07-25 01:37:10 -0700131 @Override
132 public int maxConcurrency() {
133 return 100;
134 }
135 });
Googler37ee2522021-01-28 22:54:20 -0800136 RequestMetadata metadata =
137 TracingMetadataUtils.buildMetadata(
138 "none",
139 "none",
Daniel Wagner-Halla750a562021-04-15 19:16:44 -0700140 DIGEST_UTIL.asActionKey(Digest.getDefaultInstance()).getDigest().getHash(),
141 null);
Googlerf8d49fa2021-01-29 00:03:35 -0800142 context = RemoteActionExecutionContext.create(metadata);
michajlo0f99c3c2020-03-09 16:07:50 -0700143
144 retryService = MoreExecutors.listeningDecorator(Executors.newScheduledThreadPool(1));
buchgrdc240042017-07-08 12:47:58 +0200145 }
146
147 @After
olaola6f32d5a2017-09-20 17:12:19 +0200148 public void tearDown() throws Exception {
Benjamin Peterson20c61df2022-07-25 01:37:10 -0700149 referenceCountedChannel.release();
michajlo0f99c3c2020-03-09 16:07:50 -0700150 retryService.shutdownNow();
151 retryService.awaitTermination(
152 com.google.devtools.build.lib.testutil.TestUtils.WAIT_TIMEOUT_SECONDS, TimeUnit.SECONDS);
153
buchgrdc240042017-07-08 12:47:58 +0200154 server.shutdownNow();
olaola28228e02018-02-08 11:04:46 -0800155 server.awaitTermination();
buchgrdc240042017-07-08 12:47:58 +0200156 }
157
Jakob Buchgraber315e5b12018-06-26 05:37:08 -0700158 @Test
buchgrdc240042017-07-08 12:47:58 +0200159 public void singleBlobUploadShouldWork() throws Exception {
buchgr44e40bc2017-12-04 10:44:47 -0800160 RemoteRetrier retrier =
Jakob Buchgraberbc06db92019-03-07 00:21:52 -0800161 TestUtils.newRemoteRetrier(() -> mockBackoff, (e) -> true, retryService);
olaola98104a092019-05-01 06:48:51 -0700162 ByteStreamUploader uploader =
163 new ByteStreamUploader(
164 INSTANCE_NAME,
Benjamin Peterson20c61df2022-07-25 01:37:10 -0700165 referenceCountedChannel,
jcater9d1737d2020-09-20 18:16:39 -0700166 CallCredentialsProvider.NO_CREDENTIALS,
167 /* callTimeoutSecs= */ 60,
Googler46104c62021-12-10 12:56:03 -0800168 retrier,
169 /*maximumOpenFiles=*/ -1);
buchgrdc240042017-07-08 12:47:58 +0200170
171 byte[] blob = new byte[CHUNK_SIZE * 2 + 1];
172 new Random().nextBytes(blob);
173
George Gensure3c9089b2019-05-02 07:07:55 -0700174 Chunker chunker = Chunker.builder().setInput(blob).setChunkSize(CHUNK_SIZE).build();
Chi Wanga0fe0d82022-02-02 08:09:44 -0800175 Digest digest = DIGEST_UTIL.compute(blob);
buchgrdc240042017-07-08 12:47:58 +0200176
Googler46104c62021-12-10 12:56:03 -0800177 serviceRegistry.addService(TestUtils.newNoErrorByteStreamService(blob));
buchgrdc240042017-07-08 12:47:58 +0200178
Chi Wanga0fe0d82022-02-02 08:09:44 -0800179 uploader.uploadBlob(context, digest, chunker);
George Gensure3c9089b2019-05-02 07:07:55 -0700180
181 // This test should not have triggered any retries.
cushon4f287b02021-09-28 23:12:54 -0700182 Mockito.verifyNoInteractions(mockBackoff);
George Gensure3c9089b2019-05-02 07:07:55 -0700183 }
184
185 @Test
Brandon Duffany9ad35112022-04-19 06:47:23 -0700186 public void singleChunkCompressedUploadAlreadyExists() throws Exception {
187 RemoteRetrier retrier =
188 TestUtils.newRemoteRetrier(() -> mockBackoff, (e) -> true, retryService);
189 ByteStreamUploader uploader =
190 new ByteStreamUploader(
191 INSTANCE_NAME,
Benjamin Peterson20c61df2022-07-25 01:37:10 -0700192 referenceCountedChannel,
Brandon Duffany9ad35112022-04-19 06:47:23 -0700193 CallCredentialsProvider.NO_CREDENTIALS,
194 /* callTimeoutSecs= */ 60,
195 retrier,
196 /* maximumOpenFiles= */ -1);
197
198 byte[] blob = {'A'};
199
Xiangquan Xiao7c0bdc22022-10-27 00:05:38 -0700200 // Set a chunk size that should have no problem accommodating the compressed
Brandon Duffany9ad35112022-04-19 06:47:23 -0700201 // blob, even though the blob most likely has a compression ratio >= 1.
202 Chunker chunker =
203 Chunker.builder().setInput(blob).setCompressed(true).setChunkSize(100).build();
204 Digest digest = DIGEST_UTIL.compute(blob);
205
206 serviceRegistry.addService(
207 new ByteStreamImplBase() {
208 @Override
209 public StreamObserver<WriteRequest> write(StreamObserver<WriteResponse> streamObserver) {
210 return new StreamObserver<WriteRequest>() {
211 private int numChunksReceived = 0;
212
213 @Override
214 public void onNext(WriteRequest writeRequest) {
215 // This should be the first and only chunk written.
216 numChunksReceived++;
217 assertThat(numChunksReceived).isEqualTo(1);
218 ByteString data = writeRequest.getData();
219 assertThat(data.size()).isGreaterThan(0);
220 assertThat(writeRequest.getFinishWrite()).isTrue();
221
222 // On receiving the chunk, respond with a committed size of -1
223 // to indicate that the blob already exists (per the remote API
224 // spec) and close the stream.
225 WriteResponse response = WriteResponse.newBuilder().setCommittedSize(-1).build();
226 streamObserver.onNext(response);
227 streamObserver.onCompleted();
228 }
229
230 @Override
231 public void onError(Throwable throwable) {
232 fail("onError should never be called.");
233 }
234
235 @Override
236 public void onCompleted() {
237 streamObserver.onCompleted();
238 }
239 };
240 }
241 });
242
243 uploader.uploadBlob(context, digest, chunker);
244
245 // This test should not have triggered any retries.
246 Mockito.verifyNoInteractions(mockBackoff);
247 }
248
249 @Test
George Gensure3c9089b2019-05-02 07:07:55 -0700250 public void progressiveUploadShouldWork() throws Exception {
George Gensure3c9089b2019-05-02 07:07:55 -0700251 Mockito.when(mockBackoff.getRetryAttempts()).thenReturn(0);
252 RemoteRetrier retrier =
253 TestUtils.newRemoteRetrier(() -> mockBackoff, (e) -> true, retryService);
254 ByteStreamUploader uploader =
255 new ByteStreamUploader(
Chi Wangcb1f9f62020-09-17 23:37:19 -0700256 INSTANCE_NAME,
Benjamin Peterson20c61df2022-07-25 01:37:10 -0700257 referenceCountedChannel,
Chi Wangcb1f9f62020-09-17 23:37:19 -0700258 CallCredentialsProvider.NO_CREDENTIALS,
259 3,
Googler46104c62021-12-10 12:56:03 -0800260 retrier,
261 /*maximumOpenFiles=*/ -1);
George Gensure3c9089b2019-05-02 07:07:55 -0700262
263 byte[] blob = new byte[CHUNK_SIZE * 2 + 1];
264 new Random().nextBytes(blob);
265
266 Chunker chunker = Chunker.builder().setInput(blob).setChunkSize(CHUNK_SIZE).build();
Chi Wanga0fe0d82022-02-02 08:09:44 -0800267 Digest digest = DIGEST_UTIL.compute(blob);
George Gensure3c9089b2019-05-02 07:07:55 -0700268
269 serviceRegistry.addService(
270 new ByteStreamImplBase() {
271
272 byte[] receivedData = new byte[blob.length];
273 String receivedResourceName = null;
274 boolean receivedComplete = false;
275 long nextOffset = 0;
276 long initialOffset = 0;
277 boolean mustQueryWriteStatus = false;
278
279 @Override
280 public StreamObserver<WriteRequest> write(StreamObserver<WriteResponse> streamObserver) {
281 return new StreamObserver<WriteRequest>() {
282 @Override
283 public void onNext(WriteRequest writeRequest) {
284 assertThat(mustQueryWriteStatus).isFalse();
285
286 String resourceName = writeRequest.getResourceName();
287 if (nextOffset == initialOffset) {
288 if (initialOffset == 0) {
289 receivedResourceName = resourceName;
290 }
291 assertThat(resourceName).startsWith(INSTANCE_NAME + "/uploads");
292 assertThat(resourceName).endsWith(String.valueOf(blob.length));
293 } else {
294 assertThat(resourceName).isEmpty();
295 }
296
297 assertThat(writeRequest.getWriteOffset()).isEqualTo(nextOffset);
298
299 ByteString data = writeRequest.getData();
300
301 System.arraycopy(
302 data.toByteArray(), 0, receivedData, (int) nextOffset, data.size());
303
304 nextOffset += data.size();
305 receivedComplete = blob.length == nextOffset;
306 assertThat(writeRequest.getFinishWrite()).isEqualTo(receivedComplete);
307
308 if (initialOffset == 0) {
309 streamObserver.onError(Status.DEADLINE_EXCEEDED.asException());
310 mustQueryWriteStatus = true;
311 initialOffset = nextOffset;
312 }
313 }
314
315 @Override
316 public void onError(Throwable throwable) {
317 fail("onError should never be called.");
318 }
319
320 @Override
321 public void onCompleted() {
322 assertThat(nextOffset).isEqualTo(blob.length);
323 assertThat(receivedData).isEqualTo(blob);
324
325 WriteResponse response =
326 WriteResponse.newBuilder().setCommittedSize(nextOffset).build();
327 streamObserver.onNext(response);
328 streamObserver.onCompleted();
329 }
330 };
331 }
332
333 @Override
334 public void queryWriteStatus(
335 QueryWriteStatusRequest request, StreamObserver<QueryWriteStatusResponse> response) {
336 String resourceName = request.getResourceName();
337 final long committedSize;
338 final boolean complete;
339 if (receivedResourceName != null && receivedResourceName.equals(resourceName)) {
340 assertThat(mustQueryWriteStatus).isTrue();
341 mustQueryWriteStatus = false;
342 committedSize = nextOffset;
343 complete = receivedComplete;
344 } else {
345 committedSize = 0;
346 complete = false;
347 }
348 response.onNext(
349 QueryWriteStatusResponse.newBuilder()
350 .setCommittedSize(committedSize)
351 .setComplete(complete)
352 .build());
353 response.onCompleted();
354 }
355 });
356
Chi Wanga0fe0d82022-02-02 08:09:44 -0800357 uploader.uploadBlob(context, digest, chunker);
George Gensure3c9089b2019-05-02 07:07:55 -0700358
Benjamin Petersonde196ab2022-06-13 05:55:16 -0700359 // This test triggers one retry.
360 Mockito.verify(mockBackoff, Mockito.times(1))
361 .nextDelayMillis(any(StatusRuntimeException.class));
George Gensure3c9089b2019-05-02 07:07:55 -0700362 Mockito.verify(mockBackoff, Mockito.times(1)).getRetryAttempts();
George Gensure3c9089b2019-05-02 07:07:55 -0700363 }
364
365 @Test
Alessandro Patti6da80862021-11-11 22:49:37 -0800366 public void progressiveCompressedUploadShouldWork() throws Exception {
367 Mockito.when(mockBackoff.getRetryAttempts()).thenReturn(0);
368 RemoteRetrier retrier =
369 TestUtils.newRemoteRetrier(() -> mockBackoff, (e) -> true, retryService);
370 ByteStreamUploader uploader =
371 new ByteStreamUploader(
372 INSTANCE_NAME,
Benjamin Peterson20c61df2022-07-25 01:37:10 -0700373 referenceCountedChannel,
Alessandro Patti6da80862021-11-11 22:49:37 -0800374 CallCredentialsProvider.NO_CREDENTIALS,
375 300,
Googler46104c62021-12-10 12:56:03 -0800376 retrier,
377 /*maximumOpenFiles=*/ -1);
Alessandro Patti6da80862021-11-11 22:49:37 -0800378
Benjamin Petersondd57d412022-06-15 03:25:41 -0700379 int chunkSize = 1024;
380 int skipSize = chunkSize + 1;
381 byte[] blob = new byte[chunkSize * 2 + 1];
Alessandro Patti6da80862021-11-11 22:49:37 -0800382 new Random().nextBytes(blob);
383
384 Chunker chunker =
Benjamin Petersondd57d412022-06-15 03:25:41 -0700385 Chunker.builder().setInput(blob).setCompressed(true).setChunkSize(chunkSize).build();
Chi Wanga0fe0d82022-02-02 08:09:44 -0800386 Digest digest = DIGEST_UTIL.compute(blob);
Alessandro Patti6da80862021-11-11 22:49:37 -0800387
Benjamin Petersondd57d412022-06-15 03:25:41 -0700388 ByteArrayOutputStream output = new ByteArrayOutputStream();
Alessandro Patti6da80862021-11-11 22:49:37 -0800389 serviceRegistry.addService(
390 new ByteStreamImplBase() {
Alessandro Patti6da80862021-11-11 22:49:37 -0800391 String receivedResourceName = null;
392 boolean receivedComplete = false;
393 long nextOffset = 0;
394 long initialOffset = 0;
395 boolean mustQueryWriteStatus = false;
396
397 @Override
398 public StreamObserver<WriteRequest> write(StreamObserver<WriteResponse> streamObserver) {
399 return new StreamObserver<WriteRequest>() {
400 @Override
401 public void onNext(WriteRequest writeRequest) {
402 assertThat(mustQueryWriteStatus).isFalse();
403
404 String resourceName = writeRequest.getResourceName();
405 if (nextOffset == initialOffset) {
406 if (initialOffset == 0) {
407 receivedResourceName = resourceName;
408 }
409 assertThat(resourceName).startsWith(INSTANCE_NAME + "/uploads");
410 assertThat(resourceName).endsWith(String.valueOf(blob.length));
411 } else {
412 assertThat(resourceName).isEmpty();
413 }
414
Alessandro Patti6da80862021-11-11 22:49:37 -0800415 if (initialOffset == 0) {
416 streamObserver.onError(Status.DEADLINE_EXCEEDED.asException());
417 mustQueryWriteStatus = true;
Benjamin Petersondd57d412022-06-15 03:25:41 -0700418 initialOffset = skipSize;
419 nextOffset = initialOffset;
420 } else {
421 ByteString data = writeRequest.getData();
422 try {
423 data.writeTo(output);
424 } catch (IOException e) {
425 streamObserver.onError(e);
426 return;
427 }
428 nextOffset += data.size();
429 receivedComplete = writeRequest.getFinishWrite();
Alessandro Patti6da80862021-11-11 22:49:37 -0800430 }
431 }
432
433 @Override
434 public void onError(Throwable throwable) {
435 fail("onError should never be called.");
436 }
437
438 @Override
439 public void onCompleted() {
Alessandro Patti6da80862021-11-11 22:49:37 -0800440 WriteResponse response =
441 WriteResponse.newBuilder().setCommittedSize(nextOffset).build();
442 streamObserver.onNext(response);
443 streamObserver.onCompleted();
444 }
445 };
446 }
447
448 @Override
449 public void queryWriteStatus(
450 QueryWriteStatusRequest request, StreamObserver<QueryWriteStatusResponse> response) {
451 String resourceName = request.getResourceName();
452 final long committedSize;
453 final boolean complete;
454 if (receivedResourceName != null && receivedResourceName.equals(resourceName)) {
455 assertThat(mustQueryWriteStatus).isTrue();
456 mustQueryWriteStatus = false;
Benjamin Petersondd57d412022-06-15 03:25:41 -0700457 committedSize = receivedComplete ? blob.length : skipSize;
Alessandro Patti6da80862021-11-11 22:49:37 -0800458 complete = receivedComplete;
459 } else {
460 committedSize = 0;
461 complete = false;
462 }
463 response.onNext(
464 QueryWriteStatusResponse.newBuilder()
465 .setCommittedSize(committedSize)
466 .setComplete(complete)
467 .build());
468 response.onCompleted();
469 }
470 });
471
Chi Wanga0fe0d82022-02-02 08:09:44 -0800472 uploader.uploadBlob(context, digest, chunker);
Benjamin Petersondd57d412022-06-15 03:25:41 -0700473 byte[] decompressed = Zstd.decompress(output.toByteArray(), blob.length - skipSize);
474 assertThat(Arrays.equals(decompressed, 0, decompressed.length, blob, skipSize, blob.length))
475 .isTrue();
Alessandro Patti6da80862021-11-11 22:49:37 -0800476
Benjamin Petersonde196ab2022-06-13 05:55:16 -0700477 // This test triggers one retry.
478 Mockito.verify(mockBackoff, Mockito.times(1)).nextDelayMillis(any(Exception.class));
Alessandro Patti6da80862021-11-11 22:49:37 -0800479 Mockito.verify(mockBackoff, Mockito.times(1)).getRetryAttempts();
Alessandro Patti6da80862021-11-11 22:49:37 -0800480 }
481
482 @Test
Jakob Buchgrabercc2b3ec2019-11-22 04:44:57 -0800483 public void concurrentlyCompletedUploadIsNotRetried() throws Exception {
484 // Test that after an upload has failed and the QueryWriteStatus call returns
485 // that the upload has completed that we'll not retry the upload.
Jakob Buchgrabercc2b3ec2019-11-22 04:44:57 -0800486 RemoteRetrier retrier =
487 TestUtils.newRemoteRetrier(() -> new FixedBackoff(1, 0), (e) -> true, retryService);
488 ByteStreamUploader uploader =
489 new ByteStreamUploader(
Chi Wangcb1f9f62020-09-17 23:37:19 -0700490 INSTANCE_NAME,
Benjamin Peterson20c61df2022-07-25 01:37:10 -0700491 referenceCountedChannel,
Chi Wangcb1f9f62020-09-17 23:37:19 -0700492 CallCredentialsProvider.NO_CREDENTIALS,
493 1,
Googler46104c62021-12-10 12:56:03 -0800494 retrier,
495 /*maximumOpenFiles=*/ -1);
Jakob Buchgrabercc2b3ec2019-11-22 04:44:57 -0800496
497 byte[] blob = new byte[CHUNK_SIZE * 2 + 1];
498 new Random().nextBytes(blob);
499
500 Chunker chunker = Chunker.builder().setInput(blob).setChunkSize(CHUNK_SIZE).build();
Chi Wanga0fe0d82022-02-02 08:09:44 -0800501 Digest digest = DIGEST_UTIL.compute(blob);
Jakob Buchgrabercc2b3ec2019-11-22 04:44:57 -0800502
503 AtomicInteger numWriteCalls = new AtomicInteger(0);
504
505 serviceRegistry.addService(
506 new ByteStreamImplBase() {
507 @Override
508 public StreamObserver<WriteRequest> write(StreamObserver<WriteResponse> streamObserver) {
509 numWriteCalls.getAndIncrement();
510 streamObserver.onError(Status.DEADLINE_EXCEEDED.asException());
511 return new StreamObserver<WriteRequest>() {
512 @Override
513 public void onNext(WriteRequest writeRequest) {}
514
515 @Override
516 public void onError(Throwable throwable) {}
517
518 @Override
519 public void onCompleted() {}
520 };
521 }
522
523 @Override
524 public void queryWriteStatus(
525 QueryWriteStatusRequest request, StreamObserver<QueryWriteStatusResponse> response) {
526 response.onNext(
527 QueryWriteStatusResponse.newBuilder()
528 .setCommittedSize(blob.length)
529 .setComplete(true)
530 .build());
531 response.onCompleted();
532 }
533 });
534
Chi Wanga0fe0d82022-02-02 08:09:44 -0800535 uploader.uploadBlob(context, digest, chunker);
Jakob Buchgrabercc2b3ec2019-11-22 04:44:57 -0800536
537 // This test should not have triggered any retries.
538 assertThat(numWriteCalls.get()).isEqualTo(1);
Jakob Buchgrabercc2b3ec2019-11-22 04:44:57 -0800539 }
540
541 @Test
George Gensure3c9089b2019-05-02 07:07:55 -0700542 public void unimplementedQueryShouldRestartUpload() throws Exception {
George Gensure3c9089b2019-05-02 07:07:55 -0700543 Mockito.when(mockBackoff.getRetryAttempts()).thenReturn(0);
544 RemoteRetrier retrier =
545 TestUtils.newRemoteRetrier(() -> mockBackoff, (e) -> true, retryService);
546 ByteStreamUploader uploader =
547 new ByteStreamUploader(
Chi Wangcb1f9f62020-09-17 23:37:19 -0700548 INSTANCE_NAME,
Benjamin Peterson20c61df2022-07-25 01:37:10 -0700549 referenceCountedChannel,
Chi Wangcb1f9f62020-09-17 23:37:19 -0700550 CallCredentialsProvider.NO_CREDENTIALS,
551 3,
Googler46104c62021-12-10 12:56:03 -0800552 retrier,
553 /*maximumOpenFiles=*/ -1);
George Gensure3c9089b2019-05-02 07:07:55 -0700554
555 byte[] blob = new byte[CHUNK_SIZE * 2 + 1];
556 new Random().nextBytes(blob);
557
558 Chunker chunker = Chunker.builder().setInput(blob).setChunkSize(CHUNK_SIZE).build();
Chi Wanga0fe0d82022-02-02 08:09:44 -0800559 Digest digest = DIGEST_UTIL.compute(blob);
George Gensure3c9089b2019-05-02 07:07:55 -0700560
561 serviceRegistry.addService(
562 new ByteStreamImplBase() {
563 boolean expireCall = true;
564 boolean sawReset = false;
565
566 @Override
567 public StreamObserver<WriteRequest> write(StreamObserver<WriteResponse> streamObserver) {
568 return new StreamObserver<WriteRequest>() {
569 @Override
570 public void onNext(WriteRequest writeRequest) {
571 if (expireCall) {
572 streamObserver.onError(Status.DEADLINE_EXCEEDED.asException());
573 expireCall = false;
574 } else if (!sawReset && writeRequest.getWriteOffset() != 0) {
575 streamObserver.onError(Status.INVALID_ARGUMENT.asException());
576 } else {
577 sawReset = true;
578 if (writeRequest.getFinishWrite()) {
579 long committedSize =
580 writeRequest.getWriteOffset() + writeRequest.getData().size();
581 streamObserver.onNext(
582 WriteResponse.newBuilder().setCommittedSize(committedSize).build());
583 streamObserver.onCompleted();
584 }
585 }
586 }
587
588 @Override
589 public void onError(Throwable throwable) {
590 fail("onError should never be called.");
591 }
592
593 @Override
594 public void onCompleted() {}
595 };
596 }
597
598 @Override
599 public void queryWriteStatus(
600 QueryWriteStatusRequest request, StreamObserver<QueryWriteStatusResponse> response) {
601 response.onError(Status.UNIMPLEMENTED.asException());
602 }
603 });
604
Chi Wanga0fe0d82022-02-02 08:09:44 -0800605 uploader.uploadBlob(context, digest, chunker);
George Gensure3c9089b2019-05-02 07:07:55 -0700606
607 // This test should have triggered a single retry, because it made
608 // no progress.
George Gensure51226172020-12-01 01:33:42 -0800609 Mockito.verify(mockBackoff, Mockito.times(1)).nextDelayMillis(any(Exception.class));
George Gensure3c9089b2019-05-02 07:07:55 -0700610 }
611
612 @Test
613 public void earlyWriteResponseShouldCompleteUpload() throws Exception {
George Gensure3c9089b2019-05-02 07:07:55 -0700614 RemoteRetrier retrier =
615 TestUtils.newRemoteRetrier(() -> mockBackoff, (e) -> true, retryService);
616 ByteStreamUploader uploader =
617 new ByteStreamUploader(
Chi Wangcb1f9f62020-09-17 23:37:19 -0700618 INSTANCE_NAME,
Benjamin Peterson20c61df2022-07-25 01:37:10 -0700619 referenceCountedChannel,
Chi Wangcb1f9f62020-09-17 23:37:19 -0700620 CallCredentialsProvider.NO_CREDENTIALS,
621 3,
Googler46104c62021-12-10 12:56:03 -0800622 retrier,
623 /*maximumOpenFiles=*/ -1);
George Gensure3c9089b2019-05-02 07:07:55 -0700624
625 byte[] blob = new byte[CHUNK_SIZE * 2 + 1];
626 new Random().nextBytes(blob);
627 // provide only enough data to write a single chunk
628 InputStream in = new ByteArrayInputStream(blob, 0, CHUNK_SIZE);
629
630 Chunker chunker = Chunker.builder().setInput(blob.length, in).setChunkSize(CHUNK_SIZE).build();
Chi Wanga0fe0d82022-02-02 08:09:44 -0800631 Digest digest = DIGEST_UTIL.compute(blob);
George Gensure3c9089b2019-05-02 07:07:55 -0700632
633 serviceRegistry.addService(
634 new ByteStreamImplBase() {
635 @Override
636 public StreamObserver<WriteRequest> write(StreamObserver<WriteResponse> streamObserver) {
637 streamObserver.onNext(WriteResponse.newBuilder().setCommittedSize(blob.length).build());
638 streamObserver.onCompleted();
639 return new NoopStreamObserver();
640 }
641 });
642
Chi Wanga0fe0d82022-02-02 08:09:44 -0800643 uploader.uploadBlob(context, digest, chunker);
George Gensure3c9089b2019-05-02 07:07:55 -0700644
645 // This test should not have triggered any retries.
cushon4f287b02021-09-28 23:12:54 -0700646 Mockito.verifyNoInteractions(mockBackoff);
George Gensure3c9089b2019-05-02 07:07:55 -0700647 }
648
649 @Test
Alessandro Patti6da80862021-11-11 22:49:37 -0800650 public void incorrectCommittedSizeFailsCompletedUpload() throws Exception {
George Gensure3c9089b2019-05-02 07:07:55 -0700651 RemoteRetrier retrier =
652 TestUtils.newRemoteRetrier(() -> mockBackoff, (e) -> true, retryService);
653 ByteStreamUploader uploader =
654 new ByteStreamUploader(
Chi Wangcb1f9f62020-09-17 23:37:19 -0700655 INSTANCE_NAME,
Benjamin Peterson20c61df2022-07-25 01:37:10 -0700656 referenceCountedChannel,
Chi Wangcb1f9f62020-09-17 23:37:19 -0700657 CallCredentialsProvider.NO_CREDENTIALS,
658 3,
Googler46104c62021-12-10 12:56:03 -0800659 retrier,
660 /*maximumOpenFiles=*/ -1);
George Gensure3c9089b2019-05-02 07:07:55 -0700661
662 byte[] blob = new byte[CHUNK_SIZE * 2 + 1];
663 new Random().nextBytes(blob);
664
665 Chunker chunker = Chunker.builder().setInput(blob).setChunkSize(CHUNK_SIZE).build();
Chi Wanga0fe0d82022-02-02 08:09:44 -0800666 Digest digest = DIGEST_UTIL.compute(blob);
George Gensure3c9089b2019-05-02 07:07:55 -0700667
668 serviceRegistry.addService(
669 new ByteStreamImplBase() {
670 @Override
671 public StreamObserver<WriteRequest> write(StreamObserver<WriteResponse> streamObserver) {
Alessandro Patti6da80862021-11-11 22:49:37 -0800672 return new StreamObserver<WriteRequest>() {
673 @Override
674 public void onNext(WriteRequest writeRequest) {}
675
676 @Override
677 public void onError(Throwable throwable) {
678 fail("onError should never be called.");
679 }
680
681 @Override
682 public void onCompleted() {
683 WriteResponse response =
684 WriteResponse.newBuilder().setCommittedSize(blob.length + 1).build();
685 streamObserver.onNext(response);
686 streamObserver.onCompleted();
687 }
688 };
George Gensure3c9089b2019-05-02 07:07:55 -0700689 }
690 });
691
692 try {
Chi Wanga0fe0d82022-02-02 08:09:44 -0800693 uploader.uploadBlob(context, digest, chunker);
George Gensure3c9089b2019-05-02 07:07:55 -0700694 fail("Should have thrown an exception.");
695 } catch (IOException e) {
696 // expected
697 }
buchgrdc240042017-07-08 12:47:58 +0200698
699 // This test should not have triggered any retries.
cushon4f287b02021-09-28 23:12:54 -0700700 Mockito.verifyNoInteractions(mockBackoff);
buchgrdc240042017-07-08 12:47:58 +0200701 }
702
Jakob Buchgraber315e5b12018-06-26 05:37:08 -0700703 @Test
Benjamin Petersonde196ab2022-06-13 05:55:16 -0700704 public void incorrectCommittedSizeDoesNotFailIncompleteUpload() throws Exception {
Alessandro Patti6da80862021-11-11 22:49:37 -0800705 RemoteRetrier retrier =
706 TestUtils.newRemoteRetrier(() -> mockBackoff, (e) -> true, retryService);
707 ByteStreamUploader uploader =
708 new ByteStreamUploader(
709 INSTANCE_NAME,
Benjamin Peterson20c61df2022-07-25 01:37:10 -0700710 referenceCountedChannel,
Alessandro Patti6da80862021-11-11 22:49:37 -0800711 CallCredentialsProvider.NO_CREDENTIALS,
712 300,
Googler46104c62021-12-10 12:56:03 -0800713 retrier,
714 /*maximumOpenFiles=*/ -1);
Alessandro Patti6da80862021-11-11 22:49:37 -0800715
716 byte[] blob = new byte[CHUNK_SIZE * 2 + 1];
717 new Random().nextBytes(blob);
718
719 Chunker chunker = Chunker.builder().setInput(blob).setChunkSize(CHUNK_SIZE).build();
Chi Wanga0fe0d82022-02-02 08:09:44 -0800720 Digest digest = DIGEST_UTIL.compute(blob);
Alessandro Patti6da80862021-11-11 22:49:37 -0800721
722 serviceRegistry.addService(
723 new ByteStreamImplBase() {
724 @Override
725 public StreamObserver<WriteRequest> write(StreamObserver<WriteResponse> streamObserver) {
726 streamObserver.onNext(WriteResponse.newBuilder().setCommittedSize(CHUNK_SIZE).build());
727 streamObserver.onCompleted();
728 return new NoopStreamObserver();
729 }
730 });
731
Chi Wanga0fe0d82022-02-02 08:09:44 -0800732 uploader.uploadBlob(context, digest, chunker);
Alessandro Patti6da80862021-11-11 22:49:37 -0800733 }
734
735 @Test
buchgrdc240042017-07-08 12:47:58 +0200736 public void multipleBlobsUploadShouldWork() throws Exception {
buchgr44e40bc2017-12-04 10:44:47 -0800737 RemoteRetrier retrier =
Jakob Buchgraberbc06db92019-03-07 00:21:52 -0800738 TestUtils.newRemoteRetrier(() -> new FixedBackoff(1, 0), (e) -> true, retryService);
olaola98104a092019-05-01 06:48:51 -0700739 ByteStreamUploader uploader =
740 new ByteStreamUploader(
741 INSTANCE_NAME,
Benjamin Peterson20c61df2022-07-25 01:37:10 -0700742 referenceCountedChannel,
jcater9d1737d2020-09-20 18:16:39 -0700743 CallCredentialsProvider.NO_CREDENTIALS,
744 /* callTimeoutSecs= */ 60,
Googler46104c62021-12-10 12:56:03 -0800745 retrier,
746 /*maximumOpenFiles=*/ -1);
buchgrdc240042017-07-08 12:47:58 +0200747
Jakob Buchgraber315e5b12018-06-26 05:37:08 -0700748 int numUploads = 10;
George Gensure3c9089b2019-05-02 07:07:55 -0700749 Map<HashCode, byte[]> blobsByHash = Maps.newHashMap();
Chi Wanga0fe0d82022-02-02 08:09:44 -0800750 Map<Digest, Chunker> chunkers = Maps.newHashMapWithExpectedSize(numUploads);
buchgrdc240042017-07-08 12:47:58 +0200751 Random rand = new Random();
752 for (int i = 0; i < numUploads; i++) {
753 int blobSize = rand.nextInt(CHUNK_SIZE * 10) + CHUNK_SIZE;
754 byte[] blob = new byte[blobSize];
755 rand.nextBytes(blob);
George Gensure3c9089b2019-05-02 07:07:55 -0700756 Chunker chunker = Chunker.builder().setInput(blob).setChunkSize(CHUNK_SIZE).build();
Chi Wanga0fe0d82022-02-02 08:09:44 -0800757 Digest digest = DIGEST_UTIL.compute(blob);
758 chunkers.put(digest, chunker);
759 blobsByHash.put(HashCode.fromString(digest.getHash()), blob);
buchgrdc240042017-07-08 12:47:58 +0200760 }
761
buchgrb50fe862018-07-12 04:01:45 -0700762 serviceRegistry.addService(new MaybeFailOnceUploadService(blobsByHash));
buchgrdc240042017-07-08 12:47:58 +0200763
Chi Wanga0fe0d82022-02-02 08:09:44 -0800764 uploader.uploadBlobs(context, chunkers);
buchgrdc240042017-07-08 12:47:58 +0200765 }
766
buchgrff008f42018-06-02 14:13:43 -0700767 @Test
Googler46104c62021-12-10 12:56:03 -0800768 public void tooManyFilesIOException_adviseMaximumOpenFilesFlag() throws Exception {
769 RemoteRetrier retrier =
770 TestUtils.newRemoteRetrier(() -> new FixedBackoff(1, 0), (e) -> true, retryService);
771 ByteStreamUploader uploader =
772 new ByteStreamUploader(
773 INSTANCE_NAME,
Benjamin Peterson20c61df2022-07-25 01:37:10 -0700774 referenceCountedChannel,
Googler46104c62021-12-10 12:56:03 -0800775 CallCredentialsProvider.NO_CREDENTIALS,
776 /* callTimeoutSecs= */ 60,
777 retrier,
778 /*maximumOpenFiles=*/ -1);
779 byte[] blob = new byte[CHUNK_SIZE];
780 Chunker chunker = Mockito.mock(Chunker.class);
Chi Wanga0fe0d82022-02-02 08:09:44 -0800781 Digest digest = DIGEST_UTIL.compute(blob);
Benjamin Petersonc26ec2d2022-06-15 02:27:25 -0700782 Mockito.doThrow(new IOException("Too many open files")).when(chunker).seek(0);
Chi Wanga0fe0d82022-02-02 08:09:44 -0800783 Mockito.when(chunker.getSize()).thenReturn(digest.getSizeBytes());
Benjamin Petersonc26ec2d2022-06-15 02:27:25 -0700784 serviceRegistry.addService(new MaybeFailOnceUploadService(ImmutableMap.of()));
Googler46104c62021-12-10 12:56:03 -0800785
Benjamin Petersonc26ec2d2022-06-15 02:27:25 -0700786 String newMessage =
787 "An IOException was thrown because the process opened too many files. We recommend setting"
788 + " --bep_maximum_open_remote_upload_files flag to a number lower than your system"
789 + " default (run 'ulimit -a' for *nix-based operating systems). Original error message:"
790 + " Too many open files";
791 assertThat(assertThrows(IOException.class, () -> uploader.uploadBlob(context, digest, chunker)))
792 .hasMessageThat()
793 .isEqualTo(newMessage);
Googler46104c62021-12-10 12:56:03 -0800794 }
795
796 @Test
797 public void availablePermitsOpenFileSemaphore_fewerPermitsThanUploads_endWithAllPermits()
798 throws Exception {
799 RemoteRetrier retrier =
800 TestUtils.newRemoteRetrier(() -> new FixedBackoff(1, 0), (e) -> true, retryService);
801 // number of permits is less than number of uploads to affirm permit is released
802 int maximumOpenFiles = 999;
803 ByteStreamUploader uploader =
804 new ByteStreamUploader(
805 INSTANCE_NAME,
Benjamin Peterson20c61df2022-07-25 01:37:10 -0700806 referenceCountedChannel,
Googler46104c62021-12-10 12:56:03 -0800807 CallCredentialsProvider.NO_CREDENTIALS,
808 /* callTimeoutSecs= */ 60,
809 retrier,
810 maximumOpenFiles);
811
812 assertThat(uploader.getOpenedFilePermits().availablePermits()).isEqualTo(999);
813
814 CustomFileTracker customFileTracker = new CustomFileTracker(maximumOpenFiles);
815 int numUploads = 1000;
816 Map<HashCode, byte[]> blobsByHash = Maps.newHashMap();
Chi Wanga0fe0d82022-02-02 08:09:44 -0800817 Map<Digest, Chunker> chunkers = Maps.newHashMapWithExpectedSize(numUploads);
Googler46104c62021-12-10 12:56:03 -0800818 Random rand = new Random();
819 for (int i = 0; i < numUploads; i++) {
820 int blobSize = rand.nextInt(CHUNK_SIZE * 10) + CHUNK_SIZE;
821 byte[] blob = new byte[blobSize];
822 rand.nextBytes(blob);
823 Chunker chunker =
824 TestChunker.builder(customFileTracker).setInput(blob).setChunkSize(CHUNK_SIZE).build();
Chi Wanga0fe0d82022-02-02 08:09:44 -0800825 Digest digest = DIGEST_UTIL.compute(blob);
826 chunkers.put(digest, chunker);
827 blobsByHash.put(HashCode.fromString(digest.getHash()), blob);
Googler46104c62021-12-10 12:56:03 -0800828 }
829
830 serviceRegistry.addService(new MaybeFailOnceUploadService(blobsByHash));
831
Chi Wanga0fe0d82022-02-02 08:09:44 -0800832 uploader.uploadBlobs(context, chunkers);
Googler46104c62021-12-10 12:56:03 -0800833
834 assertThat(uploader.getOpenedFilePermits().availablePermits()).isEqualTo(maximumOpenFiles);
835 }
836
837 @Test
838 public void noMaximumOpenFilesFlags_nullSemaphore() throws Exception {
839 RemoteRetrier retrier =
840 TestUtils.newRemoteRetrier(() -> new FixedBackoff(1, 0), (e) -> true, retryService);
841 ByteStreamUploader uploader =
842 new ByteStreamUploader(
843 INSTANCE_NAME,
Benjamin Peterson20c61df2022-07-25 01:37:10 -0700844 referenceCountedChannel,
Googler46104c62021-12-10 12:56:03 -0800845 CallCredentialsProvider.NO_CREDENTIALS,
846 /* callTimeoutSecs= */ 60,
847 retrier,
848 /*maximumOpenFiles=*/ -1);
849 assertThat(uploader.getOpenedFilePermits()).isNull();
850
851 int numUploads = 10;
852 Map<HashCode, byte[]> blobsByHash = Maps.newHashMap();
Chi Wanga0fe0d82022-02-02 08:09:44 -0800853 Map<Digest, Chunker> chunkers = Maps.newHashMapWithExpectedSize(numUploads);
Googler46104c62021-12-10 12:56:03 -0800854 Random rand = new Random();
855 for (int i = 0; i < numUploads; i++) {
856 int blobSize = rand.nextInt(CHUNK_SIZE * 10) + CHUNK_SIZE;
857 byte[] blob = new byte[blobSize];
858 rand.nextBytes(blob);
859 Chunker chunker = Chunker.builder().setInput(blob).setChunkSize(CHUNK_SIZE).build();
Chi Wanga0fe0d82022-02-02 08:09:44 -0800860 Digest digest = DIGEST_UTIL.compute(blob);
861 chunkers.put(digest, chunker);
862 blobsByHash.put(HashCode.fromString(digest.getHash()), blob);
Googler46104c62021-12-10 12:56:03 -0800863 }
864
865 serviceRegistry.addService(new MaybeFailOnceUploadService(blobsByHash));
866
Chi Wanga0fe0d82022-02-02 08:09:44 -0800867 uploader.uploadBlobs(context, chunkers);
Googler46104c62021-12-10 12:56:03 -0800868 assertThat(uploader.getOpenedFilePermits()).isNull();
869 }
870
871 @Test
olaola6f32d5a2017-09-20 17:12:19 +0200872 public void contextShouldBePreservedUponRetries() throws Exception {
olaola6f32d5a2017-09-20 17:12:19 +0200873 // We upload blobs with different context, and retry 3 times for each upload.
874 // We verify that the correct metadata is passed to the server with every blob.
buchgr44e40bc2017-12-04 10:44:47 -0800875 RemoteRetrier retrier =
Jakob Buchgraberbc06db92019-03-07 00:21:52 -0800876 TestUtils.newRemoteRetrier(() -> new FixedBackoff(5, 0), (e) -> true, retryService);
olaola98104a092019-05-01 06:48:51 -0700877 ByteStreamUploader uploader =
878 new ByteStreamUploader(
879 INSTANCE_NAME,
Benjamin Peterson20c61df2022-07-25 01:37:10 -0700880 referenceCountedChannel,
jcater9d1737d2020-09-20 18:16:39 -0700881 CallCredentialsProvider.NO_CREDENTIALS,
882 /* callTimeoutSecs= */ 60,
Googler46104c62021-12-10 12:56:03 -0800883 retrier,
884 /*maximumOpenFiles=*/ -1);
olaola6f32d5a2017-09-20 17:12:19 +0200885
886 List<String> toUpload = ImmutableList.of("aaaaaaaaaa", "bbbbbbbbbb", "cccccccccc");
George Gensure3c9089b2019-05-02 07:07:55 -0700887 Map<Digest, Chunker> chunkers = Maps.newHashMapWithExpectedSize(toUpload.size());
888 Map<String, Integer> uploadsFailed = Maps.newHashMap();
olaola6f32d5a2017-09-20 17:12:19 +0200889 for (String s : toUpload) {
George Gensure3c9089b2019-05-02 07:07:55 -0700890 Chunker chunker = Chunker.builder().setInput(s.getBytes(UTF_8)).setChunkSize(3).build();
891 Digest digest = DIGEST_UTIL.computeAsUtf8(s);
892 chunkers.put(digest, chunker);
893 uploadsFailed.put(digest.getHash(), 0);
olaola6f32d5a2017-09-20 17:12:19 +0200894 }
895
896 BindableService bsService =
897 new ByteStreamImplBase() {
898 @Override
899 public StreamObserver<WriteRequest> write(StreamObserver<WriteResponse> response) {
900 return new StreamObserver<WriteRequest>() {
901
902 private String digestHash;
903
904 @Override
905 public void onNext(WriteRequest writeRequest) {
906 String resourceName = writeRequest.getResourceName();
907 if (!resourceName.isEmpty()) {
908 String[] components = resourceName.split("/");
909 assertThat(components).hasLength(6);
910 digestHash = components[4];
911 }
912 assertThat(digestHash).isNotNull();
913 RequestMetadata meta = TracingMetadataUtils.fromCurrentContext();
914 assertThat(meta.getCorrelatedInvocationsId()).isEqualTo("build-req-id");
915 assertThat(meta.getToolInvocationId()).isEqualTo("command-id");
916 assertThat(meta.getActionId()).isEqualTo(digestHash);
917 assertThat(meta.getToolDetails().getToolName()).isEqualTo("bazel");
918 assertThat(meta.getToolDetails().getToolVersion())
919 .isEqualTo(BlazeVersionInfo.instance().getVersion());
920 synchronized (this) {
921 Integer numFailures = uploadsFailed.get(digestHash);
922 if (numFailures < 3) {
923 uploadsFailed.put(digestHash, numFailures + 1);
924 response.onError(Status.INTERNAL.asException());
925 return;
926 }
927 }
928 }
929
930 @Override
931 public void onError(Throwable throwable) {
932 fail("onError should never be called.");
933 }
934
935 @Override
936 public void onCompleted() {
937 response.onNext(WriteResponse.newBuilder().setCommittedSize(10).build());
938 response.onCompleted();
939 }
940 };
941 }
George Gensure3c9089b2019-05-02 07:07:55 -0700942
943 @Override
944 public void queryWriteStatus(
945 QueryWriteStatusRequest request, StreamObserver<QueryWriteStatusResponse> response) {
946 response.onNext(
947 QueryWriteStatusResponse.newBuilder()
948 .setCommittedSize(0)
949 .setComplete(false)
950 .build());
951 response.onCompleted();
952 }
olaola6f32d5a2017-09-20 17:12:19 +0200953 };
954 serviceRegistry.addService(
955 ServerInterceptors.intercept(
956 bsService, new TracingMetadataUtils.ServerHeadersInterceptor()));
957
958 List<ListenableFuture<Void>> uploads = new ArrayList<>();
959
George Gensure3c9089b2019-05-02 07:07:55 -0700960 for (Map.Entry<Digest, Chunker> chunkerEntry : chunkers.entrySet()) {
961 Digest actionDigest = chunkerEntry.getKey();
Googler37ee2522021-01-28 22:54:20 -0800962 RequestMetadata metadata =
963 TracingMetadataUtils.buildMetadata(
964 "build-req-id",
965 "command-id",
Daniel Wagner-Halla750a562021-04-15 19:16:44 -0700966 DIGEST_UTIL.asActionKey(actionDigest).getDigest().getHash(),
967 null);
Googler37ee2522021-01-28 22:54:20 -0800968 RemoteActionExecutionContext remoteActionExecutionContext =
Googlerf8d49fa2021-01-29 00:03:35 -0800969 RemoteActionExecutionContext.create(metadata);
Googler37ee2522021-01-28 22:54:20 -0800970 uploads.add(
971 uploader.uploadBlobAsync(
Chi Wanga0fe0d82022-02-02 08:09:44 -0800972 remoteActionExecutionContext, actionDigest, chunkerEntry.getValue()));
olaola6f32d5a2017-09-20 17:12:19 +0200973 }
974
975 for (ListenableFuture<Void> upload : uploads) {
976 upload.get();
977 }
olaola6f32d5a2017-09-20 17:12:19 +0200978 }
979
Jakob Buchgraber315e5b12018-06-26 05:37:08 -0700980 @Test
Alessandro Patti52c87732020-03-03 04:18:12 -0800981 public void customHeadersAreAttachedToRequest() throws Exception {
982 RemoteRetrier retrier =
983 TestUtils.newRemoteRetrier(() -> new FixedBackoff(1, 0), (e) -> true, retryService);
984
985 Metadata metadata = new Metadata();
986 metadata.put(Metadata.Key.of("Key1", Metadata.ASCII_STRING_MARSHALLER), "Value1");
987 metadata.put(Metadata.Key.of("Key2", Metadata.ASCII_STRING_MARSHALLER), "Value2");
988
Benjamin Peterson20c61df2022-07-25 01:37:10 -0700989 referenceCountedChannel.release();
990 referenceCountedChannel =
991 new ReferenceCountedChannel(
992 new ChannelConnectionFactory() {
993 @Override
994 public Single<? extends ChannelConnection> create() {
995 return Single.just(
996 new ChannelConnection(
997 InProcessChannelBuilder.forName(serverName)
998 .intercept(MetadataUtils.newAttachHeadersInterceptor(metadata))
999 .build()));
1000 }
1001
1002 @Override
1003 public int maxConcurrency() {
1004 return 100;
1005 }
1006 });
Alessandro Patti52c87732020-03-03 04:18:12 -08001007 ByteStreamUploader uploader =
1008 new ByteStreamUploader(
1009 INSTANCE_NAME,
Benjamin Peterson20c61df2022-07-25 01:37:10 -07001010 referenceCountedChannel,
jcater9d1737d2020-09-20 18:16:39 -07001011 CallCredentialsProvider.NO_CREDENTIALS,
1012 /* callTimeoutSecs= */ 60,
Googler46104c62021-12-10 12:56:03 -08001013 retrier,
1014 /*maximumOpenFiles=*/ -1);
Alessandro Patti52c87732020-03-03 04:18:12 -08001015
1016 byte[] blob = new byte[CHUNK_SIZE];
1017 Chunker chunker = Chunker.builder().setInput(blob).setChunkSize(CHUNK_SIZE).build();
Chi Wanga0fe0d82022-02-02 08:09:44 -08001018 Digest digest = DIGEST_UTIL.compute(blob);
Alessandro Patti52c87732020-03-03 04:18:12 -08001019
1020 serviceRegistry.addService(
1021 ServerInterceptors.intercept(
1022 new ByteStreamImplBase() {
1023 @Override
1024 public StreamObserver<WriteRequest> write(
1025 StreamObserver<WriteResponse> streamObserver) {
1026 return new StreamObserver<WriteRequest>() {
1027 @Override
1028 public void onNext(WriteRequest writeRequest) {}
1029
1030 @Override
1031 public void onError(Throwable throwable) {
1032 fail("onError should never be called.");
1033 }
1034
1035 @Override
1036 public void onCompleted() {
1037 WriteResponse response =
1038 WriteResponse.newBuilder().setCommittedSize(blob.length).build();
1039 streamObserver.onNext(response);
1040 streamObserver.onCompleted();
1041 }
1042 };
1043 }
1044 },
1045 new ServerInterceptor() {
1046 @Override
1047 public <ReqT, RespT> ServerCall.Listener<ReqT> interceptCall(
1048 ServerCall<ReqT, RespT> call,
1049 Metadata metadata,
1050 ServerCallHandler<ReqT, RespT> next) {
1051 assertThat(metadata.get(Metadata.Key.of("Key1", Metadata.ASCII_STRING_MARSHALLER)))
1052 .isEqualTo("Value1");
1053 assertThat(metadata.get(Metadata.Key.of("Key2", Metadata.ASCII_STRING_MARSHALLER)))
1054 .isEqualTo("Value2");
1055 assertThat(metadata.get(Metadata.Key.of("Key3", Metadata.ASCII_STRING_MARSHALLER)))
1056 .isEqualTo(null);
1057 return next.startCall(call, metadata);
1058 }
1059 }));
1060
Chi Wanga0fe0d82022-02-02 08:09:44 -08001061 uploader.uploadBlob(context, digest, chunker);
buchgrdc240042017-07-08 12:47:58 +02001062 }
1063
Jakob Buchgraber315e5b12018-06-26 05:37:08 -07001064 @Test
buchgrdc240042017-07-08 12:47:58 +02001065 public void errorsShouldBeReported() throws IOException, InterruptedException {
buchgr44e40bc2017-12-04 10:44:47 -08001066 RemoteRetrier retrier =
Jakob Buchgraberbc06db92019-03-07 00:21:52 -08001067 TestUtils.newRemoteRetrier(() -> new FixedBackoff(1, 10), (e) -> true, retryService);
olaola98104a092019-05-01 06:48:51 -07001068 ByteStreamUploader uploader =
1069 new ByteStreamUploader(
1070 INSTANCE_NAME,
Benjamin Peterson20c61df2022-07-25 01:37:10 -07001071 referenceCountedChannel,
jcater9d1737d2020-09-20 18:16:39 -07001072 CallCredentialsProvider.NO_CREDENTIALS,
1073 /* callTimeoutSecs= */ 60,
Googler46104c62021-12-10 12:56:03 -08001074 retrier,
1075 /*maximumOpenFiles=*/ -1);
buchgrdc240042017-07-08 12:47:58 +02001076
1077 byte[] blob = new byte[CHUNK_SIZE];
George Gensure3c9089b2019-05-02 07:07:55 -07001078 Chunker chunker = Chunker.builder().setInput(blob).setChunkSize(CHUNK_SIZE).build();
Chi Wanga0fe0d82022-02-02 08:09:44 -08001079 Digest digest = DIGEST_UTIL.compute(blob);
buchgrdc240042017-07-08 12:47:58 +02001080
Googler37ee2522021-01-28 22:54:20 -08001081 serviceRegistry.addService(
1082 new ByteStreamImplBase() {
1083 @Override
1084 public StreamObserver<WriteRequest> write(StreamObserver<WriteResponse> response) {
1085 response.onError(Status.INTERNAL.asException());
1086 return new NoopStreamObserver();
1087 }
1088 });
buchgrdc240042017-07-08 12:47:58 +02001089
George Gensure3c9089b2019-05-02 07:07:55 -07001090 try {
Chi Wanga0fe0d82022-02-02 08:09:44 -08001091 uploader.uploadBlob(context, digest, chunker);
George Gensure3c9089b2019-05-02 07:07:55 -07001092 fail("Should have thrown an exception.");
1093 } catch (IOException e) {
1094 assertThat(RemoteRetrierUtils.causedByStatus(e, Code.INTERNAL)).isTrue();
1095 }
buchgrdc240042017-07-08 12:47:58 +02001096 }
1097
Jakob Buchgraber315e5b12018-06-26 05:37:08 -07001098 @Test
buchgrdc240042017-07-08 12:47:58 +02001099 public void failureInRetryExecutorShouldBeHandled() throws Exception {
buchgrff008f42018-06-02 14:13:43 -07001100 ListeningScheduledExecutorService retryService =
1101 MoreExecutors.listeningDecorator(Executors.newScheduledThreadPool(1));
buchgr44e40bc2017-12-04 10:44:47 -08001102 RemoteRetrier retrier =
Jakob Buchgraberbc06db92019-03-07 00:21:52 -08001103 TestUtils.newRemoteRetrier(() -> new FixedBackoff(1, 10), (e) -> true, retryService);
olaola98104a092019-05-01 06:48:51 -07001104 ByteStreamUploader uploader =
1105 new ByteStreamUploader(
1106 INSTANCE_NAME,
Benjamin Peterson20c61df2022-07-25 01:37:10 -07001107 referenceCountedChannel,
jcater9d1737d2020-09-20 18:16:39 -07001108 CallCredentialsProvider.NO_CREDENTIALS,
1109 /* callTimeoutSecs= */ 60,
Googler46104c62021-12-10 12:56:03 -08001110 retrier,
1111 /*maximumOpenFiles=*/ -1);
buchgrdc240042017-07-08 12:47:58 +02001112
Googler37ee2522021-01-28 22:54:20 -08001113 serviceRegistry.addService(
1114 new ByteStreamImplBase() {
1115 @Override
1116 public StreamObserver<WriteRequest> write(StreamObserver<WriteResponse> response) {
1117 // Immediately fail the call, so that it is retried.
1118 response.onError(Status.ABORTED.asException());
1119 return new NoopStreamObserver();
1120 }
1121 });
buchgrdc240042017-07-08 12:47:58 +02001122
1123 retryService.shutdownNow();
1124 // Random very high timeout, as the test will timeout by itself.
1125 retryService.awaitTermination(1, TimeUnit.DAYS);
1126 assertThat(retryService.isShutdown()).isTrue();
1127
1128 byte[] blob = new byte[1];
George Gensure3c9089b2019-05-02 07:07:55 -07001129 Chunker chunker = Chunker.builder().setInput(blob).setChunkSize(CHUNK_SIZE).build();
Chi Wanga0fe0d82022-02-02 08:09:44 -08001130 Digest digest = DIGEST_UTIL.compute(blob);
George Gensure3c9089b2019-05-02 07:07:55 -07001131 try {
Chi Wanga0fe0d82022-02-02 08:09:44 -08001132 uploader.uploadBlob(context, digest, chunker);
George Gensure3c9089b2019-05-02 07:07:55 -07001133 fail("Should have thrown an exception.");
1134 } catch (IOException e) {
1135 assertThat(e).hasCauseThat().isInstanceOf(RejectedExecutionException.class);
1136 }
buchgrdc240042017-07-08 12:47:58 +02001137 }
1138
Jakob Buchgraber315e5b12018-06-26 05:37:08 -07001139 @Test
buchgrdc240042017-07-08 12:47:58 +02001140 public void resourceNameWithoutInstanceName() throws Exception {
buchgr44e40bc2017-12-04 10:44:47 -08001141 RemoteRetrier retrier =
Jakob Buchgraberbc06db92019-03-07 00:21:52 -08001142 TestUtils.newRemoteRetrier(() -> mockBackoff, (e) -> true, retryService);
buchgrdc240042017-07-08 12:47:58 +02001143 ByteStreamUploader uploader =
olaola98104a092019-05-01 06:48:51 -07001144 new ByteStreamUploader(
jcater9d1737d2020-09-20 18:16:39 -07001145 /* instanceName= */ null,
Benjamin Peterson20c61df2022-07-25 01:37:10 -07001146 referenceCountedChannel,
jcater9d1737d2020-09-20 18:16:39 -07001147 CallCredentialsProvider.NO_CREDENTIALS,
1148 /* callTimeoutSecs= */ 60,
Googler46104c62021-12-10 12:56:03 -08001149 retrier,
1150 /*maximumOpenFiles=*/ -1);
buchgrdc240042017-07-08 12:47:58 +02001151
Googler37ee2522021-01-28 22:54:20 -08001152 serviceRegistry.addService(
1153 new ByteStreamImplBase() {
buchgrdc240042017-07-08 12:47:58 +02001154 @Override
Googler37ee2522021-01-28 22:54:20 -08001155 public StreamObserver<WriteRequest> write(StreamObserver<WriteResponse> response) {
1156 return new StreamObserver<WriteRequest>() {
1157 @Override
1158 public void onNext(WriteRequest writeRequest) {
1159 // Test that the resource name doesn't start with an instance name.
1160 assertThat(writeRequest.getResourceName()).startsWith("uploads/");
1161 }
buchgrdc240042017-07-08 12:47:58 +02001162
Googler37ee2522021-01-28 22:54:20 -08001163 @Override
1164 public void onError(Throwable throwable) {}
buchgrdc240042017-07-08 12:47:58 +02001165
Googler37ee2522021-01-28 22:54:20 -08001166 @Override
1167 public void onCompleted() {
1168 response.onNext(WriteResponse.newBuilder().setCommittedSize(1).build());
1169 response.onCompleted();
1170 }
1171 };
buchgrdc240042017-07-08 12:47:58 +02001172 }
Googler37ee2522021-01-28 22:54:20 -08001173 });
buchgrdc240042017-07-08 12:47:58 +02001174
1175 byte[] blob = new byte[1];
George Gensure3c9089b2019-05-02 07:07:55 -07001176 Chunker chunker = Chunker.builder().setInput(blob).setChunkSize(CHUNK_SIZE).build();
Chi Wanga0fe0d82022-02-02 08:09:44 -08001177 Digest digest = DIGEST_UTIL.compute(blob);
buchgrdc240042017-07-08 12:47:58 +02001178
Chi Wanga0fe0d82022-02-02 08:09:44 -08001179 uploader.uploadBlob(context, digest, chunker);
buchgrdc240042017-07-08 12:47:58 +02001180 }
1181
Jakob Buchgraber315e5b12018-06-26 05:37:08 -07001182 @Test
buchgrdc240042017-07-08 12:47:58 +02001183 public void nonRetryableStatusShouldNotBeRetried() throws Exception {
buchgr44e40bc2017-12-04 10:44:47 -08001184 RemoteRetrier retrier =
Jakob Buchgraberbc06db92019-03-07 00:21:52 -08001185 TestUtils.newRemoteRetrier(
1186 () -> new FixedBackoff(1, 0), /* No Status is retriable. */ (e) -> false, retryService);
buchgrdc240042017-07-08 12:47:58 +02001187 ByteStreamUploader uploader =
olaola98104a092019-05-01 06:48:51 -07001188 new ByteStreamUploader(
jcater9d1737d2020-09-20 18:16:39 -07001189 /* instanceName= */ null,
Benjamin Peterson20c61df2022-07-25 01:37:10 -07001190 referenceCountedChannel,
jcater9d1737d2020-09-20 18:16:39 -07001191 CallCredentialsProvider.NO_CREDENTIALS,
1192 /* callTimeoutSecs= */ 60,
Googler46104c62021-12-10 12:56:03 -08001193 retrier,
1194 /*maximumOpenFiles=*/ -1);
buchgrdc240042017-07-08 12:47:58 +02001195
1196 AtomicInteger numCalls = new AtomicInteger();
1197
Googler37ee2522021-01-28 22:54:20 -08001198 serviceRegistry.addService(
1199 new ByteStreamImplBase() {
1200 @Override
1201 public StreamObserver<WriteRequest> write(StreamObserver<WriteResponse> response) {
1202 numCalls.incrementAndGet();
1203 response.onError(Status.INTERNAL.asException());
1204 return new NoopStreamObserver();
1205 }
1206 });
buchgrdc240042017-07-08 12:47:58 +02001207
1208 byte[] blob = new byte[1];
George Gensure3c9089b2019-05-02 07:07:55 -07001209 Chunker chunker = Chunker.builder().setInput(blob).setChunkSize(CHUNK_SIZE).build();
Chi Wanga0fe0d82022-02-02 08:09:44 -08001210 Digest digest = DIGEST_UTIL.compute(blob);
buchgrdc240042017-07-08 12:47:58 +02001211
George Gensure3c9089b2019-05-02 07:07:55 -07001212 try {
Chi Wanga0fe0d82022-02-02 08:09:44 -08001213 uploader.uploadBlob(context, digest, chunker);
George Gensure3c9089b2019-05-02 07:07:55 -07001214 fail("Should have thrown an exception.");
1215 } catch (IOException e) {
1216 assertThat(numCalls.get()).isEqualTo(1);
1217 }
buchgrdc240042017-07-08 12:47:58 +02001218 }
1219
buchgrb50fe862018-07-12 04:01:45 -07001220 @Test
Chi Wangcb1f9f62020-09-17 23:37:19 -07001221 public void unauthenticatedErrorShouldNotBeRetried() throws Exception {
Chi Wangcb1f9f62020-09-17 23:37:19 -07001222 RemoteRetrier retrier =
1223 TestUtils.newRemoteRetrier(
1224 () -> mockBackoff, RemoteRetrier.RETRIABLE_GRPC_ERRORS, retryService);
1225
1226 AtomicInteger refreshTimes = new AtomicInteger();
1227 CallCredentialsProvider callCredentialsProvider =
1228 new CallCredentialsProvider() {
1229 @Nullable
1230 @Override
1231 public CallCredentials getCallCredentials() {
1232 return null;
1233 }
1234
1235 @Override
1236 public void refresh() throws IOException {
1237 refreshTimes.incrementAndGet();
1238 }
1239 };
1240 ByteStreamUploader uploader =
1241 new ByteStreamUploader(
1242 INSTANCE_NAME,
Benjamin Peterson20c61df2022-07-25 01:37:10 -07001243 referenceCountedChannel,
jcater9d1737d2020-09-20 18:16:39 -07001244 callCredentialsProvider,
1245 /* callTimeoutSecs= */ 60,
Googler46104c62021-12-10 12:56:03 -08001246 retrier,
1247 /*maximumOpenFiles=*/ -1);
Chi Wangcb1f9f62020-09-17 23:37:19 -07001248
1249 byte[] blob = new byte[CHUNK_SIZE * 2 + 1];
1250 new Random().nextBytes(blob);
1251
1252 Chunker chunker = Chunker.builder().setInput(blob).setChunkSize(CHUNK_SIZE).build();
Chi Wanga0fe0d82022-02-02 08:09:44 -08001253 Digest digest = DIGEST_UTIL.compute(blob);
Chi Wangcb1f9f62020-09-17 23:37:19 -07001254
1255 AtomicInteger numUploads = new AtomicInteger();
1256 serviceRegistry.addService(
1257 new ByteStreamImplBase() {
1258 @Override
1259 public StreamObserver<WriteRequest> write(StreamObserver<WriteResponse> streamObserver) {
1260 numUploads.incrementAndGet();
1261
1262 streamObserver.onError(Status.UNAUTHENTICATED.asException());
1263 return new NoopStreamObserver();
1264 }
1265 });
1266
Chi Wanga0fe0d82022-02-02 08:09:44 -08001267 assertThrows(IOException.class, () -> uploader.uploadBlob(context, digest, chunker));
Chi Wangcb1f9f62020-09-17 23:37:19 -07001268
1269 assertThat(refreshTimes.get()).isEqualTo(1);
1270 assertThat(numUploads.get()).isEqualTo(2);
1271
1272 // This test should not have triggered any retries.
cushon4f287b02021-09-28 23:12:54 -07001273 Mockito.verifyNoInteractions(mockBackoff);
Chi Wangcb1f9f62020-09-17 23:37:19 -07001274 }
1275
1276 @Test
1277 public void shouldRefreshCredentialsOnAuthenticationError() throws Exception {
Chi Wangcb1f9f62020-09-17 23:37:19 -07001278 RemoteRetrier retrier =
1279 TestUtils.newRemoteRetrier(
1280 () -> mockBackoff, RemoteRetrier.RETRIABLE_GRPC_ERRORS, retryService);
1281
1282 AtomicInteger refreshTimes = new AtomicInteger();
1283 CallCredentialsProvider callCredentialsProvider =
1284 new CallCredentialsProvider() {
1285 @Nullable
1286 @Override
1287 public CallCredentials getCallCredentials() {
1288 return null;
1289 }
1290
1291 @Override
1292 public void refresh() throws IOException {
1293 refreshTimes.incrementAndGet();
1294 }
1295 };
1296 ByteStreamUploader uploader =
1297 new ByteStreamUploader(
1298 INSTANCE_NAME,
Benjamin Peterson20c61df2022-07-25 01:37:10 -07001299 referenceCountedChannel,
jcater9d1737d2020-09-20 18:16:39 -07001300 callCredentialsProvider,
1301 /* callTimeoutSecs= */ 60,
Googler46104c62021-12-10 12:56:03 -08001302 retrier,
1303 /*maximumOpenFiles=*/ -1);
Chi Wangcb1f9f62020-09-17 23:37:19 -07001304
1305 byte[] blob = new byte[CHUNK_SIZE * 2 + 1];
1306 new Random().nextBytes(blob);
1307
1308 Chunker chunker = Chunker.builder().setInput(blob).setChunkSize(CHUNK_SIZE).build();
Chi Wanga0fe0d82022-02-02 08:09:44 -08001309 Digest digest = DIGEST_UTIL.compute(blob);
Chi Wangcb1f9f62020-09-17 23:37:19 -07001310
1311 AtomicInteger numUploads = new AtomicInteger();
1312 serviceRegistry.addService(
1313 new ByteStreamImplBase() {
1314 @Override
1315 public StreamObserver<WriteRequest> write(StreamObserver<WriteResponse> streamObserver) {
1316 numUploads.incrementAndGet();
1317
1318 if (refreshTimes.get() == 0) {
1319 streamObserver.onError(Status.UNAUTHENTICATED.asException());
1320 return new NoopStreamObserver();
1321 }
1322
1323 return new StreamObserver<WriteRequest>() {
1324 long nextOffset = 0;
1325
1326 @Override
1327 public void onNext(WriteRequest writeRequest) {
1328 nextOffset += writeRequest.getData().size();
1329 boolean lastWrite = blob.length == nextOffset;
1330 assertThat(writeRequest.getFinishWrite()).isEqualTo(lastWrite);
1331 }
1332
1333 @Override
1334 public void onError(Throwable throwable) {
1335 fail("onError should never be called.");
1336 }
1337
1338 @Override
1339 public void onCompleted() {
1340 assertThat(nextOffset).isEqualTo(blob.length);
1341
1342 WriteResponse response =
1343 WriteResponse.newBuilder().setCommittedSize(nextOffset).build();
1344 streamObserver.onNext(response);
1345 streamObserver.onCompleted();
1346 }
1347 };
1348 }
1349 });
1350
Chi Wanga0fe0d82022-02-02 08:09:44 -08001351 uploader.uploadBlob(context, digest, chunker);
Chi Wangcb1f9f62020-09-17 23:37:19 -07001352
1353 assertThat(refreshTimes.get()).isEqualTo(1);
1354 assertThat(numUploads.get()).isEqualTo(2);
1355
1356 // This test should not have triggered any retries.
cushon4f287b02021-09-28 23:12:54 -07001357 Mockito.verifyNoInteractions(mockBackoff);
Chi Wangcb1f9f62020-09-17 23:37:19 -07001358 }
1359
Alessandro Patti6da80862021-11-11 22:49:37 -08001360 @Test
Benjamin Peterson72d52552021-12-22 04:28:21 -08001361 public void failureAfterUploadCompletes() throws Exception {
1362 AtomicInteger numUploads = new AtomicInteger();
1363 RemoteRetrier retrier =
1364 TestUtils.newRemoteRetrier(
1365 () -> mockBackoff, e -> e instanceof StatusRuntimeException, retryService);
1366 ByteStreamUploader uploader =
1367 new ByteStreamUploader(
1368 INSTANCE_NAME,
Benjamin Peterson20c61df2022-07-25 01:37:10 -07001369 referenceCountedChannel,
Benjamin Peterson72d52552021-12-22 04:28:21 -08001370 CallCredentialsProvider.NO_CREDENTIALS,
1371 /* callTimeoutSecs= */ 60,
1372 retrier,
1373 -1);
1374
1375 byte[] blob = new byte[CHUNK_SIZE - 1];
1376 new Random().nextBytes(blob);
1377
1378 serviceRegistry.addService(
1379 new ByteStreamImplBase() {
1380 @Override
1381 public StreamObserver<WriteRequest> write(StreamObserver<WriteResponse> streamObserver) {
1382 numUploads.incrementAndGet();
1383 return new StreamObserver<WriteRequest>() {
1384 @Override
1385 public void onNext(WriteRequest writeRequest) {}
1386
1387 @Override
1388 public void onError(Throwable throwable) {
1389 fail("onError should never be called.");
1390 }
1391
1392 @Override
1393 public void onCompleted() {
1394 streamObserver.onNext(
1395 WriteResponse.newBuilder().setCommittedSize(blob.length).build());
1396 streamObserver.onError(Status.UNAVAILABLE.asException());
1397 }
1398 };
1399 }
1400
1401 @Override
1402 public void queryWriteStatus(
1403 QueryWriteStatusRequest request, StreamObserver<QueryWriteStatusResponse> response) {
1404 response.onNext(
1405 QueryWriteStatusResponse.newBuilder()
1406 .setCommittedSize(blob.length)
1407 .setComplete(true)
1408 .build());
1409 response.onCompleted();
1410 }
1411 });
1412
1413 Chunker chunker = Chunker.builder().setInput(blob).setChunkSize(CHUNK_SIZE).build();
Chi Wanga0fe0d82022-02-02 08:09:44 -08001414 Digest digest = DIGEST_UTIL.compute(blob);
Benjamin Peterson72d52552021-12-22 04:28:21 -08001415
Chi Wanga0fe0d82022-02-02 08:09:44 -08001416 uploader.uploadBlob(context, digest, chunker);
Benjamin Peterson72d52552021-12-22 04:28:21 -08001417
1418 assertThat(numUploads.get()).isEqualTo(1);
1419 }
1420
1421 @Test
Alessandro Patti6da80862021-11-11 22:49:37 -08001422 public void testCompressedUploads() throws Exception {
1423 RemoteRetrier retrier =
1424 TestUtils.newRemoteRetrier(() -> mockBackoff, (e) -> true, retryService);
1425 ByteStreamUploader uploader =
1426 new ByteStreamUploader(
1427 INSTANCE_NAME,
Benjamin Peterson20c61df2022-07-25 01:37:10 -07001428 referenceCountedChannel,
Alessandro Patti6da80862021-11-11 22:49:37 -08001429 CallCredentialsProvider.NO_CREDENTIALS,
1430 /* callTimeoutSecs= */ 60,
Googler46104c62021-12-10 12:56:03 -08001431 retrier,
1432 /*maximumOpenFiles=*/ -1);
Alessandro Patti6da80862021-11-11 22:49:37 -08001433
1434 byte[] blob = new byte[CHUNK_SIZE * 2 + 1];
1435 new Random().nextBytes(blob);
1436
1437 AtomicInteger numUploads = new AtomicInteger();
1438
1439 serviceRegistry.addService(
1440 new ByteStreamImplBase() {
1441 @Override
1442 public StreamObserver<WriteRequest> write(StreamObserver<WriteResponse> streamObserver) {
1443 return new StreamObserver<WriteRequest>() {
1444 ByteArrayOutputStream baos = new ByteArrayOutputStream();
1445 String resourceName = null;
1446
1447 @Override
1448 public void onNext(WriteRequest writeRequest) {
1449 if (!writeRequest.getResourceName().isEmpty()) {
1450 if (resourceName != null) {
1451 assertThat(resourceName).isEqualTo(writeRequest.getResourceName());
1452 } else {
1453 resourceName = writeRequest.getResourceName();
1454 assertThat(resourceName).contains("/compressed-blobs/zstd/");
1455 }
1456 }
1457 try {
1458 writeRequest.getData().writeTo(baos);
1459 if (writeRequest.getFinishWrite()) {
1460 baos.close();
1461 }
1462 } catch (IOException e) {
1463 throw new AssertionError("I/O error on ByteArrayOutputStream.", e);
1464 }
1465 }
1466
1467 @Override
1468 public void onError(Throwable throwable) {
1469 fail("onError should never be called.");
1470 }
1471
1472 @Override
1473 public void onCompleted() {
1474 byte[] data = baos.toByteArray();
1475 try {
1476 ZstdInputStream zis = new ZstdInputStream(new ByteArrayInputStream(data));
1477 byte[] decompressed = ByteString.readFrom(zis).toByteArray();
1478 zis.close();
1479 Digest digest = DIGEST_UTIL.compute(decompressed);
1480
1481 assertThat(blob).hasLength(decompressed.length);
1482 assertThat(resourceName).isNotNull();
1483 assertThat(resourceName)
1484 .endsWith(String.format("/%s/%s", digest.getHash(), digest.getSizeBytes()));
1485
1486 numUploads.incrementAndGet();
1487 } catch (IOException e) {
1488 throw new AssertionError("Failed decompressing data.", e);
1489 } finally {
1490 WriteResponse response =
1491 WriteResponse.newBuilder().setCommittedSize(data.length).build();
1492
1493 streamObserver.onNext(response);
1494 streamObserver.onCompleted();
1495 }
1496 }
1497 };
1498 }
1499 });
1500
1501 Chunker chunker =
1502 Chunker.builder().setInput(blob).setCompressed(true).setChunkSize(CHUNK_SIZE).build();
Chi Wanga0fe0d82022-02-02 08:09:44 -08001503 Digest digest = DIGEST_UTIL.compute(blob);
Alessandro Patti6da80862021-11-11 22:49:37 -08001504
Chi Wanga0fe0d82022-02-02 08:09:44 -08001505 uploader.uploadBlob(context, digest, chunker);
Alessandro Patti6da80862021-11-11 22:49:37 -08001506
1507 // This test should not have triggered any retries.
1508 Mockito.verifyNoInteractions(mockBackoff);
1509
Alessandro Patti6da80862021-11-11 22:49:37 -08001510 assertThat(numUploads.get()).isEqualTo(1);
1511 }
1512
buchgrdc240042017-07-08 12:47:58 +02001513 private static class NoopStreamObserver implements StreamObserver<WriteRequest> {
1514 @Override
Googler37ee2522021-01-28 22:54:20 -08001515 public void onNext(WriteRequest writeRequest) {}
buchgrdc240042017-07-08 12:47:58 +02001516
1517 @Override
Googler37ee2522021-01-28 22:54:20 -08001518 public void onError(Throwable throwable) {}
buchgrdc240042017-07-08 12:47:58 +02001519
1520 @Override
Googler37ee2522021-01-28 22:54:20 -08001521 public void onCompleted() {}
buchgrdc240042017-07-08 12:47:58 +02001522 }
1523
buchgrb50fe862018-07-12 04:01:45 -07001524 static class FixedBackoff implements Retrier.Backoff {
buchgrdc240042017-07-08 12:47:58 +02001525
1526 private final int maxRetries;
1527 private final int delayMillis;
1528
1529 private int retries;
1530
1531 public FixedBackoff(int maxRetries, int delayMillis) {
1532 this.maxRetries = maxRetries;
1533 this.delayMillis = delayMillis;
1534 }
1535
1536 @Override
George Gensure51226172020-12-01 01:33:42 -08001537 public long nextDelayMillis(Exception e) {
buchgrdc240042017-07-08 12:47:58 +02001538 if (retries < maxRetries) {
1539 retries++;
1540 return delayMillis;
1541 }
1542 return -1;
1543 }
1544
1545 @Override
1546 public int getRetryAttempts() {
1547 return retries;
1548 }
1549 }
buchgr7c227992017-07-11 12:13:32 +02001550
buchgrb50fe862018-07-12 04:01:45 -07001551 /**
1552 * An byte stream service where an upload for a given blob may or may not fail on the first
1553 * attempt but is guaranteed to succeed on the second try.
1554 */
1555 static class MaybeFailOnceUploadService extends ByteStreamImplBase {
1556
George Gensure3c9089b2019-05-02 07:07:55 -07001557 private final Map<HashCode, byte[]> blobsByHash;
1558 private final Set<HashCode> uploadsFailedOnce = Collections.synchronizedSet(Sets.newHashSet());
buchgrb50fe862018-07-12 04:01:45 -07001559 private final Random rand = new Random();
1560
George Gensure3c9089b2019-05-02 07:07:55 -07001561 MaybeFailOnceUploadService(Map<HashCode, byte[]> blobsByHash) {
buchgrb50fe862018-07-12 04:01:45 -07001562 this.blobsByHash = blobsByHash;
1563 }
1564
1565 @Override
1566 public StreamObserver<WriteRequest> write(StreamObserver<WriteResponse> response) {
1567 return new StreamObserver<WriteRequest>() {
1568
George Gensure3c9089b2019-05-02 07:07:55 -07001569 private HashCode digestHash;
buchgrb50fe862018-07-12 04:01:45 -07001570 private byte[] receivedData;
1571 private long nextOffset;
Benjamin Peterson632af2c2022-06-29 05:32:42 -07001572 private boolean failed;
buchgrb50fe862018-07-12 04:01:45 -07001573
1574 @Override
1575 public void onNext(WriteRequest writeRequest) {
1576 if (nextOffset == 0) {
1577 String resourceName = writeRequest.getResourceName();
1578 assertThat(resourceName).isNotEmpty();
1579
1580 String[] components = resourceName.split("/");
1581 assertThat(components).hasLength(6);
George Gensure3c9089b2019-05-02 07:07:55 -07001582 digestHash = HashCode.fromString(components[4]);
buchgrb50fe862018-07-12 04:01:45 -07001583 assertThat(blobsByHash).containsKey(digestHash);
1584 receivedData = new byte[Integer.parseInt(components[5])];
1585 }
1586 assertThat(digestHash).isNotNull();
1587 // An upload for a given blob has a 10% chance to fail once during its lifetime.
1588 // This is to exercise the retry mechanism a bit.
George Gensure3c9089b2019-05-02 07:07:55 -07001589 boolean shouldFail = rand.nextInt(10) == 0 && !uploadsFailedOnce.contains(digestHash);
buchgrb50fe862018-07-12 04:01:45 -07001590 if (shouldFail) {
1591 uploadsFailedOnce.add(digestHash);
1592 response.onError(Status.INTERNAL.asException());
Benjamin Peterson632af2c2022-06-29 05:32:42 -07001593 failed = true;
buchgrb50fe862018-07-12 04:01:45 -07001594 return;
1595 }
1596
1597 ByteString data = writeRequest.getData();
George Gensure3c9089b2019-05-02 07:07:55 -07001598 System.arraycopy(data.toByteArray(), 0, receivedData, (int) nextOffset, data.size());
buchgrb50fe862018-07-12 04:01:45 -07001599 nextOffset += data.size();
1600
1601 boolean lastWrite = nextOffset == receivedData.length;
1602 assertThat(writeRequest.getFinishWrite()).isEqualTo(lastWrite);
1603 }
1604
1605 @Override
1606 public void onError(Throwable throwable) {
1607 fail("onError should never be called.");
1608 }
1609
1610 @Override
1611 public void onCompleted() {
Benjamin Peterson632af2c2022-06-29 05:32:42 -07001612 if (failed) {
1613 return;
1614 }
buchgrb50fe862018-07-12 04:01:45 -07001615 byte[] expectedBlob = blobsByHash.get(digestHash);
1616 assertThat(receivedData).isEqualTo(expectedBlob);
1617
1618 WriteResponse writeResponse =
1619 WriteResponse.newBuilder().setCommittedSize(receivedData.length).build();
1620
1621 response.onNext(writeResponse);
1622 response.onCompleted();
1623 }
1624 };
1625 }
George Gensure3c9089b2019-05-02 07:07:55 -07001626
1627 @Override
1628 public void queryWriteStatus(
1629 QueryWriteStatusRequest request, StreamObserver<QueryWriteStatusResponse> response) {
1630 // force the client to reset the write
1631 response.onNext(
1632 QueryWriteStatusResponse.newBuilder().setCommittedSize(0).setComplete(false).build());
1633 response.onCompleted();
1634 }
buchgrb50fe862018-07-12 04:01:45 -07001635 }
1636
Googler46104c62021-12-10 12:56:03 -08001637 /* Custom Chunker used to track number of open files */
1638 private static class TestChunker extends Chunker {
1639
1640 TestChunker(Supplier<InputStream> dataSupplier, long size, int chunkSize, boolean compressed) {
1641 super(dataSupplier, size, chunkSize, compressed);
1642 }
1643
1644 public static Builder builder(CustomFileTracker customFileTracker) {
1645 return new TestChunkerBuilder(customFileTracker);
1646 }
1647
1648 private static class TestChunkerBuilder extends Chunker.Builder {
1649 private final CustomFileTracker customFileTracker;
1650
1651 TestChunkerBuilder(CustomFileTracker customFileTracker) {
1652 this.customFileTracker = customFileTracker;
1653 }
1654
1655 @Override
1656 public Chunker.Builder setInput(byte[] existingData) {
Chi Wanga0fe0d82022-02-02 08:09:44 -08001657 checkState(this.inputStream == null);
1658 this.size = existingData.length;
Googler46104c62021-12-10 12:56:03 -08001659 return setInputSupplier(
1660 () -> new TestByteArrayInputStream(existingData, customFileTracker));
1661 }
1662 }
1663 }
1664
1665 private static class TestByteArrayInputStream extends ByteArrayInputStream {
1666 private final CustomFileTracker customFileTracker;
1667
1668 TestByteArrayInputStream(byte[] buf, CustomFileTracker customFileTracker) {
1669 super(buf);
1670 this.customFileTracker = customFileTracker;
1671 customFileTracker.incrementOpenFiles();
1672 }
1673
1674 @Override
1675 public void close() throws IOException {
1676 super.close();
1677 customFileTracker.decrementOpenFiles();
1678 }
1679 }
1680
1681 private static class CustomFileTracker {
1682 private final AtomicInteger openFiles = new AtomicInteger(0);
1683 private final int maxOpenFiles;
1684
1685 CustomFileTracker(int maxOpenFiles) {
1686 this.maxOpenFiles = maxOpenFiles;
1687 }
1688
1689 private void incrementOpenFiles() {
1690 openFiles.getAndIncrement();
1691 checkState(openFiles.get() <= maxOpenFiles);
1692 }
1693
1694 private void decrementOpenFiles() {
1695 openFiles.getAndDecrement();
1696 checkState(openFiles.get() >= 0);
1697 }
1698 }
buchgrdc240042017-07-08 12:47:58 +02001699}