blob: e809b74b7e1314e66737164061c9db78ccce5611 [file] [log] [blame]
buchgrdc240042017-07-08 12:47:58 +02001// Copyright 2017 The Bazel Authors. All rights reserved.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14package com.google.devtools.build.lib.remote;
15
16import static com.google.common.truth.Truth.assertThat;
olaola6f32d5a2017-09-20 17:12:19 +020017import static java.nio.charset.StandardCharsets.UTF_8;
buchgrdc240042017-07-08 12:47:58 +020018import static org.junit.Assert.fail;
19
20import com.google.bytestream.ByteStreamGrpc;
21import com.google.bytestream.ByteStreamGrpc.ByteStreamImplBase;
22import com.google.bytestream.ByteStreamProto.WriteRequest;
23import com.google.bytestream.ByteStreamProto.WriteResponse;
olaola6f32d5a2017-09-20 17:12:19 +020024import com.google.common.collect.ImmutableList;
buchgrdc240042017-07-08 12:47:58 +020025import com.google.common.util.concurrent.ListenableFuture;
26import com.google.common.util.concurrent.ListeningScheduledExecutorService;
27import com.google.common.util.concurrent.MoreExecutors;
olaola6f32d5a2017-09-20 17:12:19 +020028import com.google.devtools.build.lib.analysis.BlazeVersionInfo;
29import com.google.devtools.remoteexecution.v1test.Digest;
30import com.google.devtools.remoteexecution.v1test.RequestMetadata;
buchgrdc240042017-07-08 12:47:58 +020031import com.google.protobuf.ByteString;
olaola6f32d5a2017-09-20 17:12:19 +020032import io.grpc.BindableService;
buchgrdc240042017-07-08 12:47:58 +020033import io.grpc.Channel;
olaola6f32d5a2017-09-20 17:12:19 +020034import io.grpc.Context;
buchgrdc240042017-07-08 12:47:58 +020035import io.grpc.Metadata;
36import io.grpc.Server;
37import io.grpc.ServerCall;
38import io.grpc.ServerCall.Listener;
39import io.grpc.ServerCallHandler;
olaola6f32d5a2017-09-20 17:12:19 +020040import io.grpc.ServerInterceptors;
buchgrdc240042017-07-08 12:47:58 +020041import io.grpc.ServerServiceDefinition;
42import io.grpc.Status;
43import io.grpc.Status.Code;
44import io.grpc.inprocess.InProcessChannelBuilder;
45import io.grpc.inprocess.InProcessServerBuilder;
46import io.grpc.stub.StreamObserver;
47import io.grpc.util.MutableHandlerRegistry;
48import java.io.IOException;
49import java.util.ArrayList;
50import java.util.Collections;
51import java.util.HashMap;
52import java.util.HashSet;
53import java.util.List;
54import java.util.Map;
55import java.util.Random;
56import java.util.Set;
57import java.util.concurrent.CountDownLatch;
58import java.util.concurrent.Executors;
59import java.util.concurrent.Future;
60import java.util.concurrent.RejectedExecutionException;
61import java.util.concurrent.TimeUnit;
62import java.util.concurrent.atomic.AtomicInteger;
63import org.junit.After;
64import org.junit.Before;
65import org.junit.Test;
66import org.junit.runner.RunWith;
67import org.junit.runners.JUnit4;
68import org.mockito.Mock;
69import org.mockito.Mockito;
70import org.mockito.MockitoAnnotations;
71
72/**
73 * Tests for {@link ByteStreamUploader}.
74 */
75@RunWith(JUnit4.class)
76public class ByteStreamUploaderTest {
77
78 private static final int CHUNK_SIZE = 10;
79 private static final String INSTANCE_NAME = "foo";
80
81 private final MutableHandlerRegistry serviceRegistry = new MutableHandlerRegistry();
82 private final ListeningScheduledExecutorService retryService =
83 MoreExecutors.listeningDecorator(Executors.newScheduledThreadPool(1));
84
85 private Server server;
86 private Channel channel;
olaola6f32d5a2017-09-20 17:12:19 +020087 private Context withEmptyMetadata;
buchgrdc240042017-07-08 12:47:58 +020088
89 @Mock
90 private Retrier.Backoff mockBackoff;
91
92 @Before
olaola6f32d5a2017-09-20 17:12:19 +020093 public final void setUp() throws Exception {
buchgrdc240042017-07-08 12:47:58 +020094 MockitoAnnotations.initMocks(this);
95
96 String serverName = "Server for " + this.getClass();
97 server = InProcessServerBuilder.forName(serverName).fallbackHandlerRegistry(serviceRegistry)
98 .build().start();
99 channel = InProcessChannelBuilder.forName(serverName).build();
olaola6f32d5a2017-09-20 17:12:19 +0200100 withEmptyMetadata =
101 TracingMetadataUtils.contextWithMetadata(
102 "none", "none", Digests.unsafeActionKeyFromDigest(Digest.getDefaultInstance()));
103 // Needs to be repeated in every test that uses the timeout setting, since the tests run
104 // on different threads than the setUp.
105 withEmptyMetadata.attach();
buchgrdc240042017-07-08 12:47:58 +0200106 }
107
108 @After
olaola6f32d5a2017-09-20 17:12:19 +0200109 public void tearDown() throws Exception {
buchgrdc240042017-07-08 12:47:58 +0200110 server.shutdownNow();
111 retryService.shutdownNow();
112 }
113
114 @Test(timeout = 10000)
115 public void singleBlobUploadShouldWork() throws Exception {
olaola6f32d5a2017-09-20 17:12:19 +0200116 withEmptyMetadata.attach();
buchgrdc240042017-07-08 12:47:58 +0200117 Retrier retrier = new Retrier(() -> mockBackoff, (Status s) -> true);
118 ByteStreamUploader uploader =
119 new ByteStreamUploader(INSTANCE_NAME, channel, null, 3, retrier, retryService);
120
121 byte[] blob = new byte[CHUNK_SIZE * 2 + 1];
122 new Random().nextBytes(blob);
123
buchgr226510b2017-07-12 10:29:27 +0200124 Chunker chunker = new Chunker(blob, CHUNK_SIZE);
buchgrdc240042017-07-08 12:47:58 +0200125
126 serviceRegistry.addService(new ByteStreamImplBase() {
127 @Override
128 public StreamObserver<WriteRequest> write(StreamObserver<WriteResponse> streamObserver) {
129 return new StreamObserver<WriteRequest>() {
130
131 byte[] receivedData = new byte[blob.length];
132 long nextOffset = 0;
133
134 @Override
135 public void onNext(WriteRequest writeRequest) {
136 if (nextOffset == 0) {
137 assertThat(writeRequest.getResourceName()).isNotEmpty();
138 assertThat(writeRequest.getResourceName()).startsWith(INSTANCE_NAME + "/uploads");
139 assertThat(writeRequest.getResourceName()).endsWith(String.valueOf(blob.length));
140 } else {
141 assertThat(writeRequest.getResourceName()).isEmpty();
142 }
143
144 assertThat(writeRequest.getWriteOffset()).isEqualTo(nextOffset);
145
146 ByteString data = writeRequest.getData();
147
148 System.arraycopy(data.toByteArray(), 0, receivedData, (int) nextOffset,
149 data.size());
150
151 nextOffset += data.size();
152 boolean lastWrite = blob.length == nextOffset;
153 assertThat(writeRequest.getFinishWrite()).isEqualTo(lastWrite);
154 }
155
156 @Override
157 public void onError(Throwable throwable) {
158 fail("onError should never be called.");
159 }
160
161 @Override
162 public void onCompleted() {
163 assertThat(nextOffset).isEqualTo(blob.length);
164 assertThat(receivedData).isEqualTo(blob);
165
166 WriteResponse response =
167 WriteResponse.newBuilder().setCommittedSize(nextOffset).build();
168 streamObserver.onNext(response);
169 streamObserver.onCompleted();
170 }
171 };
172 }
173 });
174
buchgr226510b2017-07-12 10:29:27 +0200175 uploader.uploadBlob(chunker);
buchgrdc240042017-07-08 12:47:58 +0200176
177 // This test should not have triggered any retries.
178 Mockito.verifyZeroInteractions(mockBackoff);
179
buchgr7c227992017-07-11 12:13:32 +0200180 blockUntilInternalStateConsistent(uploader);
buchgrdc240042017-07-08 12:47:58 +0200181 }
182
183 @Test(timeout = 20000)
184 public void multipleBlobsUploadShouldWork() throws Exception {
olaola6f32d5a2017-09-20 17:12:19 +0200185 withEmptyMetadata.attach();
buchgrdc240042017-07-08 12:47:58 +0200186 Retrier retrier = new Retrier(() -> new FixedBackoff(1, 0), (Status s) -> true);
187 ByteStreamUploader uploader =
188 new ByteStreamUploader(INSTANCE_NAME, channel, null, 3, retrier, retryService);
189
190 int numUploads = 100;
191 Map<String, byte[]> blobsByHash = new HashMap<>();
buchgr226510b2017-07-12 10:29:27 +0200192 List<Chunker> builders = new ArrayList<>(numUploads);
buchgrdc240042017-07-08 12:47:58 +0200193 Random rand = new Random();
194 for (int i = 0; i < numUploads; i++) {
195 int blobSize = rand.nextInt(CHUNK_SIZE * 10) + CHUNK_SIZE;
196 byte[] blob = new byte[blobSize];
197 rand.nextBytes(blob);
buchgr226510b2017-07-12 10:29:27 +0200198 Chunker chunker = new Chunker(blob, CHUNK_SIZE);
199 builders.add(chunker);
200 blobsByHash.put(chunker.digest().getHash(), blob);
buchgrdc240042017-07-08 12:47:58 +0200201 }
202
203 Set<String> uploadsFailedOnce = Collections.synchronizedSet(new HashSet<>());
204
205 serviceRegistry.addService(new ByteStreamImplBase() {
206 @Override
207 public StreamObserver<WriteRequest> write(StreamObserver<WriteResponse> response) {
208 return new StreamObserver<WriteRequest>() {
209
210 private String digestHash;
211 private byte[] receivedData;
212 private long nextOffset;
213
214 @Override
215 public void onNext(WriteRequest writeRequest) {
216 if (nextOffset == 0) {
217 String resourceName = writeRequest.getResourceName();
218 assertThat(resourceName).isNotEmpty();
219
220 String[] components = resourceName.split("/");
221 assertThat(components).hasLength(6);
222 digestHash = components[4];
223 assertThat(blobsByHash).containsKey(digestHash);
224 receivedData = new byte[Integer.parseInt(components[5])];
225 }
226 assertThat(digestHash).isNotNull();
227 // An upload for a given blob has a 10% chance to fail once during its lifetime.
228 // This is to exercise the retry mechanism a bit.
229 boolean shouldFail =
230 rand.nextInt(10) == 0 && !uploadsFailedOnce.contains(digestHash);
231 if (shouldFail) {
232 uploadsFailedOnce.add(digestHash);
233 response.onError(Status.INTERNAL.asException());
234 return;
235 }
236
237 ByteString data = writeRequest.getData();
238 System.arraycopy(
239 data.toByteArray(), 0, receivedData, (int) nextOffset, data.size());
240 nextOffset += data.size();
241
242 boolean lastWrite = nextOffset == receivedData.length;
243 assertThat(writeRequest.getFinishWrite()).isEqualTo(lastWrite);
244 }
245
246 @Override
247 public void onError(Throwable throwable) {
248 fail("onError should never be called.");
249 }
250
251 @Override
252 public void onCompleted() {
253 byte[] expectedBlob = blobsByHash.get(digestHash);
254 assertThat(receivedData).isEqualTo(expectedBlob);
255
256 WriteResponse writeResponse =
257 WriteResponse.newBuilder().setCommittedSize(receivedData.length).build();
258
259 response.onNext(writeResponse);
260 response.onCompleted();
261 }
262 };
263 }
264 });
265
266 uploader.uploadBlobs(builders);
267
buchgr7c227992017-07-11 12:13:32 +0200268 blockUntilInternalStateConsistent(uploader);
buchgrdc240042017-07-08 12:47:58 +0200269 }
270
olaola6f32d5a2017-09-20 17:12:19 +0200271 @Test(timeout = 20000)
272 public void contextShouldBePreservedUponRetries() throws Exception {
273 withEmptyMetadata.attach();
274 // We upload blobs with different context, and retry 3 times for each upload.
275 // We verify that the correct metadata is passed to the server with every blob.
276 Retrier retrier = new Retrier(() -> new FixedBackoff(3, 0), (Status s) -> true);
277 ByteStreamUploader uploader =
278 new ByteStreamUploader(INSTANCE_NAME, channel, null, 3, retrier, retryService);
279
280 List<String> toUpload = ImmutableList.of("aaaaaaaaaa", "bbbbbbbbbb", "cccccccccc");
281 List<Chunker> builders = new ArrayList<>(toUpload.size());
282 Map<String, Integer> uploadsFailed = new HashMap<>();
283 for (String s : toUpload) {
284 Chunker chunker = new Chunker(s.getBytes(UTF_8), /* chunkSize=*/ 3);
285 builders.add(chunker);
286 uploadsFailed.put(chunker.digest().getHash(), 0);
287 }
288
289 BindableService bsService =
290 new ByteStreamImplBase() {
291 @Override
292 public StreamObserver<WriteRequest> write(StreamObserver<WriteResponse> response) {
293 return new StreamObserver<WriteRequest>() {
294
295 private String digestHash;
296
297 @Override
298 public void onNext(WriteRequest writeRequest) {
299 String resourceName = writeRequest.getResourceName();
300 if (!resourceName.isEmpty()) {
301 String[] components = resourceName.split("/");
302 assertThat(components).hasLength(6);
303 digestHash = components[4];
304 }
305 assertThat(digestHash).isNotNull();
306 RequestMetadata meta = TracingMetadataUtils.fromCurrentContext();
307 assertThat(meta.getCorrelatedInvocationsId()).isEqualTo("build-req-id");
308 assertThat(meta.getToolInvocationId()).isEqualTo("command-id");
309 assertThat(meta.getActionId()).isEqualTo(digestHash);
310 assertThat(meta.getToolDetails().getToolName()).isEqualTo("bazel");
311 assertThat(meta.getToolDetails().getToolVersion())
312 .isEqualTo(BlazeVersionInfo.instance().getVersion());
313 synchronized (this) {
314 Integer numFailures = uploadsFailed.get(digestHash);
315 if (numFailures < 3) {
316 uploadsFailed.put(digestHash, numFailures + 1);
317 response.onError(Status.INTERNAL.asException());
318 return;
319 }
320 }
321 }
322
323 @Override
324 public void onError(Throwable throwable) {
325 fail("onError should never be called.");
326 }
327
328 @Override
329 public void onCompleted() {
330 response.onNext(WriteResponse.newBuilder().setCommittedSize(10).build());
331 response.onCompleted();
332 }
333 };
334 }
335 };
336 serviceRegistry.addService(
337 ServerInterceptors.intercept(
338 bsService, new TracingMetadataUtils.ServerHeadersInterceptor()));
339
340 List<ListenableFuture<Void>> uploads = new ArrayList<>();
341
342 for (Chunker chunker : builders) {
343 Context ctx =
344 TracingMetadataUtils.contextWithMetadata(
345 "build-req-id", "command-id", Digests.unsafeActionKeyFromDigest(chunker.digest()));
346 ctx.call(
347 () -> {
348 uploads.add(uploader.uploadBlobAsync(chunker));
349 return null;
350 });
351 }
352
353 for (ListenableFuture<Void> upload : uploads) {
354 upload.get();
355 }
356
357 blockUntilInternalStateConsistent(uploader);
358 }
359
buchgrdc240042017-07-08 12:47:58 +0200360 @Test(timeout = 10000)
361 public void sameBlobShouldNotBeUploadedTwice() throws Exception {
362 // Test that uploading the same file concurrently triggers only one file upload.
363
olaola6f32d5a2017-09-20 17:12:19 +0200364 withEmptyMetadata.attach();
buchgrdc240042017-07-08 12:47:58 +0200365 Retrier retrier = new Retrier(() -> mockBackoff, (Status s) -> true);
366 ByteStreamUploader uploader =
367 new ByteStreamUploader(INSTANCE_NAME, channel, null, 3, retrier, retryService);
368
369 byte[] blob = new byte[CHUNK_SIZE * 10];
buchgr226510b2017-07-12 10:29:27 +0200370 Chunker chunker = new Chunker(blob, CHUNK_SIZE);
buchgrdc240042017-07-08 12:47:58 +0200371
372 AtomicInteger numWriteCalls = new AtomicInteger();
buchgr423a46a2017-07-12 11:24:43 +0200373 CountDownLatch blocker = new CountDownLatch(1);
buchgrdc240042017-07-08 12:47:58 +0200374
375 serviceRegistry.addService(new ByteStreamImplBase() {
376 @Override
377 public StreamObserver<WriteRequest> write(StreamObserver<WriteResponse> response) {
378 numWriteCalls.incrementAndGet();
buchgr423a46a2017-07-12 11:24:43 +0200379 try {
380 // Ensures that the first upload does not finish, before the second upload is started.
381 blocker.await();
382 } catch (InterruptedException e) {
383 Thread.currentThread().interrupt();
384 }
buchgrdc240042017-07-08 12:47:58 +0200385
386 return new StreamObserver<WriteRequest>() {
387
388 private long bytesReceived;
389
390 @Override
391 public void onNext(WriteRequest writeRequest) {
392 bytesReceived += writeRequest.getData().size();
393 }
394
395 @Override
396 public void onError(Throwable throwable) {
397 fail("onError should never be called.");
398 }
399
400 @Override
401 public void onCompleted() {
402 response.onNext(WriteResponse.newBuilder().setCommittedSize(bytesReceived).build());
403 response.onCompleted();
404 }
405 };
406 }
407 });
408
buchgr226510b2017-07-12 10:29:27 +0200409 Future<?> upload1 = uploader.uploadBlobAsync(chunker);
410 Future<?> upload2 = uploader.uploadBlobAsync(chunker);
buchgrdc240042017-07-08 12:47:58 +0200411
buchgr423a46a2017-07-12 11:24:43 +0200412 blocker.countDown();
413
buchgrdc240042017-07-08 12:47:58 +0200414 assertThat(upload1).isSameAs(upload2);
415
416 upload1.get();
417
418 assertThat(numWriteCalls.get()).isEqualTo(1);
419 }
420
421 @Test(timeout = 10000)
422 public void errorsShouldBeReported() throws IOException, InterruptedException {
olaola6f32d5a2017-09-20 17:12:19 +0200423 withEmptyMetadata.attach();
buchgrdc240042017-07-08 12:47:58 +0200424 Retrier retrier = new Retrier(() -> new FixedBackoff(1, 10), (Status s) -> true);
425 ByteStreamUploader uploader =
426 new ByteStreamUploader(INSTANCE_NAME, channel, null, 3, retrier, retryService);
427
428 byte[] blob = new byte[CHUNK_SIZE];
buchgr226510b2017-07-12 10:29:27 +0200429 Chunker chunker = new Chunker(blob, CHUNK_SIZE);
buchgrdc240042017-07-08 12:47:58 +0200430
431 serviceRegistry.addService(new ByteStreamImplBase() {
432 @Override
433 public StreamObserver<WriteRequest> write(StreamObserver<WriteResponse> response) {
434 response.onError(Status.INTERNAL.asException());
435 return new NoopStreamObserver();
436 }
437 });
438
439 try {
buchgr226510b2017-07-12 10:29:27 +0200440 uploader.uploadBlob(chunker);
buchgrdc240042017-07-08 12:47:58 +0200441 fail("Should have thrown an exception.");
442 } catch (RetryException e) {
443 assertThat(e.getAttempts()).isEqualTo(2);
444 assertThat(e.causedByStatusCode(Code.INTERNAL)).isTrue();
445 }
446 }
447
448 @Test(timeout = 10000)
449 public void shutdownShouldCancelOngoingUploads() throws Exception {
olaola6f32d5a2017-09-20 17:12:19 +0200450 withEmptyMetadata.attach();
buchgrdc240042017-07-08 12:47:58 +0200451 Retrier retrier = new Retrier(() -> new FixedBackoff(1, 10), (Status s) -> true);
452 ByteStreamUploader uploader =
453 new ByteStreamUploader(INSTANCE_NAME, channel, null, 3, retrier, retryService);
454
455 CountDownLatch cancellations = new CountDownLatch(2);
456
457 ServerServiceDefinition service =
458 ServerServiceDefinition.builder(ByteStreamGrpc.SERVICE_NAME)
459 .addMethod(ByteStreamGrpc.METHOD_WRITE,
460 new ServerCallHandler<WriteRequest, WriteResponse>() {
461 @Override
462 public Listener<WriteRequest> startCall(ServerCall<WriteRequest, WriteResponse> call,
463 Metadata headers) {
464 // Don't request() any messages from the client, so that the client will be blocked
465 // on flow control and thus the call will sit there idle long enough to receive the
466 // cancellation.
467 return new Listener<WriteRequest>() {
468 @Override
469 public void onCancel() {
470 cancellations.countDown();
471 }
472 };
473 }
474 })
475 .build();
476
477 serviceRegistry.addService(service);
478
479 byte[] blob1 = new byte[CHUNK_SIZE];
buchgr226510b2017-07-12 10:29:27 +0200480 Chunker chunker1 = new Chunker(blob1, CHUNK_SIZE);
buchgrdc240042017-07-08 12:47:58 +0200481
482 byte[] blob2 = new byte[CHUNK_SIZE + 1];
buchgr226510b2017-07-12 10:29:27 +0200483 Chunker chunker2 = new Chunker(blob2, CHUNK_SIZE);
buchgrdc240042017-07-08 12:47:58 +0200484
buchgr226510b2017-07-12 10:29:27 +0200485 ListenableFuture<Void> f1 = uploader.uploadBlobAsync(chunker1);
486 ListenableFuture<Void> f2 = uploader.uploadBlobAsync(chunker2);
buchgrdc240042017-07-08 12:47:58 +0200487
488 assertThat(uploader.uploadsInProgress()).isTrue();
489
490 uploader.shutdown();
491
492 cancellations.await();
493
494 assertThat(f1.isCancelled()).isTrue();
495 assertThat(f2.isCancelled()).isTrue();
496
buchgr7c227992017-07-11 12:13:32 +0200497 blockUntilInternalStateConsistent(uploader);
buchgrdc240042017-07-08 12:47:58 +0200498 }
499
500 @Test(timeout = 10000)
501 public void failureInRetryExecutorShouldBeHandled() throws Exception {
olaola6f32d5a2017-09-20 17:12:19 +0200502 withEmptyMetadata.attach();
buchgrdc240042017-07-08 12:47:58 +0200503 Retrier retrier = new Retrier(() -> new FixedBackoff(1, 10), (Status s) -> true);
504 ByteStreamUploader uploader =
505 new ByteStreamUploader(INSTANCE_NAME, channel, null, 3, retrier, retryService);
506
507 serviceRegistry.addService(new ByteStreamImplBase() {
508 @Override
509 public StreamObserver<WriteRequest> write(StreamObserver<WriteResponse> response) {
510 // Immediately fail the call, so that it is retried.
511 response.onError(Status.ABORTED.asException());
512 return new NoopStreamObserver();
513 }
514 });
515
516 retryService.shutdownNow();
517 // Random very high timeout, as the test will timeout by itself.
518 retryService.awaitTermination(1, TimeUnit.DAYS);
519 assertThat(retryService.isShutdown()).isTrue();
520
521 byte[] blob = new byte[1];
buchgr226510b2017-07-12 10:29:27 +0200522 Chunker chunker = new Chunker(blob, CHUNK_SIZE);
buchgrdc240042017-07-08 12:47:58 +0200523 try {
buchgr226510b2017-07-12 10:29:27 +0200524 uploader.uploadBlob(chunker);
buchgrdc240042017-07-08 12:47:58 +0200525 fail("Should have thrown an exception.");
526 } catch (RetryException e) {
527 assertThat(e).hasCauseThat().isInstanceOf(RejectedExecutionException.class);
528 }
529 }
530
531 @Test(timeout = 10000)
532 public void resourceNameWithoutInstanceName() throws Exception {
olaola6f32d5a2017-09-20 17:12:19 +0200533 withEmptyMetadata.attach();
buchgrdc240042017-07-08 12:47:58 +0200534 Retrier retrier = new Retrier(() -> mockBackoff, (Status s) -> true);
535 ByteStreamUploader uploader =
536 new ByteStreamUploader(/* instanceName */ null, channel, null, 3, retrier, retryService);
537
538 serviceRegistry.addService(new ByteStreamImplBase() {
539 @Override
540 public StreamObserver<WriteRequest> write(StreamObserver<WriteResponse> response) {
541 return new StreamObserver<WriteRequest>() {
542 @Override
543 public void onNext(WriteRequest writeRequest) {
544 // Test that the resource name doesn't start with an instance name.
545 assertThat(writeRequest.getResourceName()).startsWith("uploads/");
546 }
547
548 @Override
549 public void onError(Throwable throwable) {
550
551 }
552
553 @Override
554 public void onCompleted() {
555 response.onNext(WriteResponse.newBuilder().setCommittedSize(1).build());
556 response.onCompleted();
557 }
558 };
559 }
560 });
561
562 byte[] blob = new byte[1];
buchgr226510b2017-07-12 10:29:27 +0200563 Chunker chunker = new Chunker(blob, CHUNK_SIZE);
buchgrdc240042017-07-08 12:47:58 +0200564
buchgr226510b2017-07-12 10:29:27 +0200565 uploader.uploadBlob(chunker);
buchgrdc240042017-07-08 12:47:58 +0200566 }
567
568 @Test(timeout = 10000)
569 public void nonRetryableStatusShouldNotBeRetried() throws Exception {
olaola6f32d5a2017-09-20 17:12:19 +0200570 withEmptyMetadata.attach();
buchgrdc240042017-07-08 12:47:58 +0200571 Retrier retrier = new Retrier(() -> new FixedBackoff(1, 0),
572 /* No Status is retriable. */ (Status s) -> false);
573 ByteStreamUploader uploader =
574 new ByteStreamUploader(/* instanceName */ null, channel, null, 3, retrier, retryService);
575
576 AtomicInteger numCalls = new AtomicInteger();
577
578 serviceRegistry.addService(new ByteStreamImplBase() {
579 @Override
580 public StreamObserver<WriteRequest> write(StreamObserver<WriteResponse> response) {
581 numCalls.incrementAndGet();
582 response.onError(Status.INTERNAL.asException());
583 return new NoopStreamObserver();
584 }
585 });
586
587 byte[] blob = new byte[1];
buchgr226510b2017-07-12 10:29:27 +0200588 Chunker chunker = new Chunker(blob, CHUNK_SIZE);
buchgrdc240042017-07-08 12:47:58 +0200589
590 try {
buchgr226510b2017-07-12 10:29:27 +0200591 uploader.uploadBlob(chunker);
buchgrdc240042017-07-08 12:47:58 +0200592 fail("Should have thrown an exception.");
593 } catch (RetryException e) {
594 assertThat(numCalls.get()).isEqualTo(1);
595 }
596 }
597
598 private static class NoopStreamObserver implements StreamObserver<WriteRequest> {
599 @Override
600 public void onNext(WriteRequest writeRequest) {
601 }
602
603 @Override
604 public void onError(Throwable throwable) {
605 }
606
607 @Override
608 public void onCompleted() {
609 }
610 }
611
612 private static class FixedBackoff implements Retrier.Backoff {
613
614 private final int maxRetries;
615 private final int delayMillis;
616
617 private int retries;
618
619 public FixedBackoff(int maxRetries, int delayMillis) {
620 this.maxRetries = maxRetries;
621 this.delayMillis = delayMillis;
622 }
623
624 @Override
625 public long nextDelayMillis() {
626 if (retries < maxRetries) {
627 retries++;
628 return delayMillis;
629 }
630 return -1;
631 }
632
633 @Override
634 public int getRetryAttempts() {
635 return retries;
636 }
637 }
buchgr7c227992017-07-11 12:13:32 +0200638
639 private void blockUntilInternalStateConsistent(ByteStreamUploader uploader) throws Exception {
640 // Poll until all upload futures have been removed from the internal hash map. The polling is
641 // necessary, as listeners are executed after Future.get() calls are notified about completion.
642 while (uploader.uploadsInProgress()) {
643 Thread.sleep(1);
644 }
645 }
buchgrdc240042017-07-08 12:47:58 +0200646}