blob: 69c107bfbc33b3e1fad4712e5a50db4f30d455b3 [file] [log] [blame]
buchgrb50fe862018-07-12 04:01:45 -07001// Copyright 2018 The Bazel Authors. All rights reserved.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14package com.google.devtools.build.lib.remote;
15
16import static com.google.common.truth.Truth.assertThat;
17import static org.junit.Assert.fail;
tomlu9efbc252018-08-10 11:51:20 -070018import static org.mockito.Mockito.mock;
buchgrb50fe862018-07-12 04:01:45 -070019
olaolaf0aa55d2018-08-16 08:51:06 -070020import build.bazel.remote.execution.v2.Digest;
buchgrb50fe862018-07-12 04:01:45 -070021import com.google.bytestream.ByteStreamProto.WriteRequest;
22import com.google.bytestream.ByteStreamProto.WriteResponse;
23import com.google.common.io.BaseEncoding;
24import com.google.common.util.concurrent.ListeningScheduledExecutorService;
25import com.google.common.util.concurrent.MoreExecutors;
26import com.google.devtools.build.lib.buildeventstream.BuildEvent.LocalFile;
27import com.google.devtools.build.lib.buildeventstream.BuildEvent.LocalFile.LocalFileType;
28import com.google.devtools.build.lib.buildeventstream.PathConverter;
29import com.google.devtools.build.lib.clock.JavaClock;
30import com.google.devtools.build.lib.remote.ByteStreamUploaderTest.FixedBackoff;
31import com.google.devtools.build.lib.remote.ByteStreamUploaderTest.MaybeFailOnceUploadService;
32import com.google.devtools.build.lib.remote.Retrier.RetryException;
33import com.google.devtools.build.lib.remote.util.DigestUtil;
34import com.google.devtools.build.lib.remote.util.TracingMetadataUtils;
35import com.google.devtools.build.lib.vfs.DigestHashFunction;
36import com.google.devtools.build.lib.vfs.FileSystem;
37import com.google.devtools.build.lib.vfs.Path;
38import com.google.devtools.build.lib.vfs.inmemoryfs.InMemoryFileSystem;
buchgrb50fe862018-07-12 04:01:45 -070039import io.grpc.Context;
40import io.grpc.ManagedChannel;
41import io.grpc.Server;
42import io.grpc.Status;
43import io.grpc.inprocess.InProcessChannelBuilder;
44import io.grpc.inprocess.InProcessServerBuilder;
45import io.grpc.stub.StreamObserver;
46import io.grpc.util.MutableHandlerRegistry;
47import java.io.OutputStream;
48import java.util.HashMap;
49import java.util.Map;
50import java.util.Random;
51import java.util.concurrent.ExecutionException;
52import java.util.concurrent.Executors;
53import org.junit.After;
54import org.junit.AfterClass;
55import org.junit.Before;
56import org.junit.BeforeClass;
57import org.junit.Test;
58import org.junit.runner.RunWith;
59import org.junit.runners.JUnit4;
60import org.mockito.MockitoAnnotations;
61
62/** Test for {@link ByteStreamBuildEventArtifactUploader}. */
63@RunWith(JUnit4.class)
64public class ByteStreamBuildEventArtifactUploaderTest {
65
66 private static final DigestUtil DIGEST_UTIL = new DigestUtil(DigestHashFunction.SHA256);
67
68 private final MutableHandlerRegistry serviceRegistry = new MutableHandlerRegistry();
69 private static ListeningScheduledExecutorService retryService;
70
71 private Server server;
72 private ManagedChannel channel;
73 private Context withEmptyMetadata;
74 private Context prevContext;
75 private final FileSystem fs = new InMemoryFileSystem(new JavaClock(), DigestHashFunction.SHA256);
76
77 @BeforeClass
78 public static void beforeEverything() {
79 retryService = MoreExecutors.listeningDecorator(Executors.newScheduledThreadPool(1));
80 }
81
82 @Before
83 public final void setUp() throws Exception {
84 MockitoAnnotations.initMocks(this);
85
86 String serverName = "Server for " + this.getClass();
87 server =
88 InProcessServerBuilder.forName(serverName)
89 .fallbackHandlerRegistry(serviceRegistry)
90 .build()
91 .start();
92 channel = InProcessChannelBuilder.forName(serverName).build();
93 withEmptyMetadata =
94 TracingMetadataUtils.contextWithMetadata(
95 "none", "none", DIGEST_UTIL.asActionKey(Digest.getDefaultInstance()));
96 // Needs to be repeated in every test that uses the timeout setting, since the tests run
97 // on different threads than the setUp.
98 prevContext = withEmptyMetadata.attach();
99 }
100
101 @After
102 public void tearDown() throws Exception {
103 // Needs to be repeated in every test that uses the timeout setting, since the tests run
104 // on different threads than the tearDown.
105 withEmptyMetadata.detach(prevContext);
106
107 server.shutdownNow();
108 server.awaitTermination();
109 }
110
111 @AfterClass
112 public static void afterEverything() {
113 retryService.shutdownNow();
114 }
115
116 @Before
117 public void setup() {
118 MockitoAnnotations.initMocks(this);
119 }
120
121 @Test
122 public void uploadsShouldWork() throws Exception {
123 int numUploads = 2;
124 Map<String, byte[]> blobsByHash = new HashMap<>();
125 Map<Path, LocalFile> filesToUpload = new HashMap<>();
126 Random rand = new Random();
127 for (int i = 0; i < numUploads; i++) {
128 Path file = fs.getPath("/file" + i);
129 OutputStream out = file.getOutputStream();
130 int blobSize = rand.nextInt(100) + 1;
131 byte[] blob = new byte[blobSize];
132 rand.nextBytes(blob);
133 out.write(blob);
134 out.close();
135 blobsByHash.put(DIGEST_UTIL.compute(file).getHash(), blob);
136 filesToUpload.put(file, new LocalFile(file, LocalFileType.OUTPUT));
137 }
138 serviceRegistry.addService(new MaybeFailOnceUploadService(blobsByHash));
139
140 RemoteRetrier retrier =
141 new RemoteRetrier(
142 () -> new FixedBackoff(1, 0), (e) -> true, retryService, Retrier.ALLOW_ALL_CALLS);
143 ReferenceCountedChannel refCntChannel = new ReferenceCountedChannel(channel);
144 ByteStreamUploader uploader =
145 new ByteStreamUploader("instance", refCntChannel, null, 3, retrier);
146 ByteStreamBuildEventArtifactUploader artifactUploader =
147 new ByteStreamBuildEventArtifactUploader(
148 uploader, "localhost", withEmptyMetadata, "instance");
149
150 PathConverter pathConverter = artifactUploader.upload(filesToUpload).get();
151 for (Path file : filesToUpload.keySet()) {
152 String hash = BaseEncoding.base16().lowerCase().encode(file.getDigest());
153 long size = file.getFileSize();
154 String conversion = pathConverter.apply(file);
155 assertThat(conversion)
156 .isEqualTo("bytestream://localhost/instance/blobs/" + hash + "/" + size);
157 }
158
159 artifactUploader.shutdown();
160
161 assertThat(uploader.refCnt()).isEqualTo(0);
162 assertThat(refCntChannel.isShutdown()).isTrue();
163 }
164
165 @Test
tomlu9efbc252018-08-10 11:51:20 -0700166 public void testUploadDirectoryDoesNotCrash() throws Exception {
167 Path dir = fs.getPath("/dir");
168 dir.createDirectoryAndParents();
169 Map<Path, LocalFile> filesToUpload = new HashMap<>();
170 filesToUpload.put(dir, new LocalFile(dir, LocalFileType.OUTPUT));
171 ByteStreamUploader uploader = mock(ByteStreamUploader.class);
172 ByteStreamBuildEventArtifactUploader artifactUploader =
173 new ByteStreamBuildEventArtifactUploader(
174 uploader, "localhost", withEmptyMetadata, "instance");
175 PathConverter pathConverter = artifactUploader.upload(filesToUpload).get();
176 assertThat(pathConverter.apply(dir)).isNull();
177 artifactUploader.shutdown();
178 }
179
180 @Test
buchgrb50fe862018-07-12 04:01:45 -0700181 public void someUploadsFail() throws Exception {
182 // Test that if one of multiple file uploads fails, the upload future fails and that the
183 // error is propagated correctly.
184
185 int numUploads = 10;
186 Map<String, byte[]> blobsByHash = new HashMap<>();
187 Map<Path, LocalFile> filesToUpload = new HashMap<>();
188 Random rand = new Random();
189 for (int i = 0; i < numUploads; i++) {
190 Path file = fs.getPath("/file" + i);
191 OutputStream out = file.getOutputStream();
192 int blobSize = rand.nextInt(100) + 1;
193 byte[] blob = new byte[blobSize];
194 rand.nextBytes(blob);
195 out.write(blob);
196 out.flush();
197 out.close();
198 blobsByHash.put(DIGEST_UTIL.compute(file).getHash(), blob);
199 filesToUpload.put(file, new LocalFile(file, LocalFileType.OUTPUT));
200 }
201 String hashOfBlobThatShouldFail = blobsByHash.keySet().iterator().next();
202 serviceRegistry.addService(new MaybeFailOnceUploadService(blobsByHash) {
203 @Override
204 public StreamObserver<WriteRequest> write(StreamObserver<WriteResponse> response) {
205 StreamObserver<WriteRequest> delegate = super.write(response);
206 return new StreamObserver<WriteRequest>() {
207 @Override
208 public void onNext(WriteRequest value) {
209 if (value.getResourceName().contains(hashOfBlobThatShouldFail)) {
210 response.onError(Status.CANCELLED.asException());
211 } else {
212 delegate.onNext(value);
213 }
214 }
215
216 @Override
217 public void onError(Throwable t) {
218 delegate.onError(t);
219 }
220
221 @Override
222 public void onCompleted() {
223 delegate.onCompleted();
224 }
225 };
226 }
227 });
228
229 RemoteRetrier retrier =
230 new RemoteRetrier(
231 () -> new FixedBackoff(1, 0), (e) -> true, retryService, Retrier.ALLOW_ALL_CALLS);
232 ReferenceCountedChannel refCntChannel = new ReferenceCountedChannel(channel);
233 ByteStreamUploader uploader =
234 new ByteStreamUploader("instance", refCntChannel, null, 3, retrier);
235 ByteStreamBuildEventArtifactUploader artifactUploader =
236 new ByteStreamBuildEventArtifactUploader(
237 uploader, "localhost", withEmptyMetadata, "instance");
238
239 try {
240 artifactUploader.upload(filesToUpload).get();
241 fail("exception expected.");
242 } catch (ExecutionException e) {
243 assertThat(e.getCause()).isInstanceOf(RetryException.class);
244 assertThat(Status.fromThrowable(e).getCode()).isEqualTo(Status.CANCELLED.getCode());
245 }
246
247 artifactUploader.shutdown();
248
249 assertThat(uploader.refCnt()).isEqualTo(0);
250 assertThat(refCntChannel.isShutdown()).isTrue();
251 }
252}