blob: 9c90f6dd1d2564121d9ae9d8dac115e8c4e2c834 [file] [log] [blame]
// Copyright 2018 The Bazel Authors. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package com.google.devtools.build.lib.remote;
import static com.google.common.truth.Truth.assertThat;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import build.bazel.remote.execution.v2.CacheCapabilities;
import build.bazel.remote.execution.v2.Digest;
import com.google.bytestream.ByteStreamProto.WriteRequest;
import com.google.bytestream.ByteStreamProto.WriteResponse;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import com.google.common.eventbus.EventBus;
import com.google.common.hash.HashCode;
import com.google.common.io.BaseEncoding;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListeningScheduledExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.devtools.build.lib.actions.ActionInputMap;
import com.google.devtools.build.lib.actions.Artifact;
import com.google.devtools.build.lib.actions.ArtifactRoot;
import com.google.devtools.build.lib.actions.ArtifactRoot.RootType;
import com.google.devtools.build.lib.actions.FileArtifactValue;
import com.google.devtools.build.lib.actions.FileArtifactValue.RemoteFileArtifactValue;
import com.google.devtools.build.lib.actions.StaticInputMetadataProvider;
import com.google.devtools.build.lib.actions.util.ActionsTestUtil;
import com.google.devtools.build.lib.authandtls.CallCredentialsProvider;
import com.google.devtools.build.lib.buildeventstream.BuildEvent.LocalFile;
import com.google.devtools.build.lib.buildeventstream.BuildEvent.LocalFile.LocalFileType;
import com.google.devtools.build.lib.buildeventstream.PathConverter;
import com.google.devtools.build.lib.clock.JavaClock;
import com.google.devtools.build.lib.events.Reporter;
import com.google.devtools.build.lib.events.StoredEventHandler;
import com.google.devtools.build.lib.remote.ByteStreamUploaderTest.FixedBackoff;
import com.google.devtools.build.lib.remote.ByteStreamUploaderTest.MaybeFailOnceUploadService;
import com.google.devtools.build.lib.remote.common.MissingDigestsFinder;
import com.google.devtools.build.lib.remote.common.RemoteActionExecutionContext;
import com.google.devtools.build.lib.remote.grpc.ChannelConnectionFactory;
import com.google.devtools.build.lib.remote.options.RemoteBuildEventUploadMode;
import com.google.devtools.build.lib.remote.options.RemoteOptions;
import com.google.devtools.build.lib.remote.util.DigestUtil;
import com.google.devtools.build.lib.remote.util.RxNoGlobalErrorsRule;
import com.google.devtools.build.lib.remote.util.TestUtils;
import com.google.devtools.build.lib.vfs.DigestHashFunction;
import com.google.devtools.build.lib.vfs.FileSystem;
import com.google.devtools.build.lib.vfs.FileSystemUtils;
import com.google.devtools.build.lib.vfs.Path;
import com.google.devtools.build.lib.vfs.SyscallCache;
import com.google.devtools.build.lib.vfs.inmemoryfs.InMemoryFileSystem;
import com.google.devtools.common.options.Options;
import io.grpc.Server;
import io.grpc.Status;
import io.grpc.inprocess.InProcessChannelBuilder;
import io.grpc.inprocess.InProcessServerBuilder;
import io.grpc.stub.StreamObserver;
import io.grpc.util.MutableHandlerRegistry;
import io.reactivex.rxjava3.core.Single;
import java.io.OutputStream;
import java.nio.charset.StandardCharsets;
import java.util.HashMap;
import java.util.Map;
import java.util.Random;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import org.junit.After;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
import org.mockito.Mockito;
import org.mockito.MockitoAnnotations;
/** Test for {@link ByteStreamBuildEventArtifactUploader}. */
@RunWith(JUnit4.class)
public class ByteStreamBuildEventArtifactUploaderTest {
private static final DigestUtil DIGEST_UTIL =
new DigestUtil(SyscallCache.NO_CACHE, DigestHashFunction.SHA256);
@Rule public final RxNoGlobalErrorsRule rxNoGlobalErrorsRule = new RxNoGlobalErrorsRule();
private final Reporter reporter = new Reporter(new EventBus());
private final StoredEventHandler eventHandler = new StoredEventHandler();
private final MutableHandlerRegistry serviceRegistry = new MutableHandlerRegistry();
private ListeningScheduledExecutorService retryService;
private Server server;
private ChannelConnectionFactory channelConnectionFactory;
private final FileSystem fs = new InMemoryFileSystem(new JavaClock(), DigestHashFunction.SHA256);
private final Path execRoot = fs.getPath("/execroot");
private ArtifactRoot outputRoot;
@Before
public final void setUp() throws Exception {
reporter.addHandler(eventHandler);
String serverName = "Server for " + this.getClass();
server =
InProcessServerBuilder.forName(serverName)
.fallbackHandlerRegistry(serviceRegistry)
.build()
.start();
channelConnectionFactory =
new ChannelConnectionFactory() {
@Override
public Single<? extends ChannelConnection> create() {
return Single.just(
new ChannelConnection(InProcessChannelBuilder.forName(serverName).build()));
}
@Override
public int maxConcurrency() {
return 100;
}
};
outputRoot = ArtifactRoot.asDerivedRoot(execRoot, RootType.Output, "out");
outputRoot.getRoot().asPath().createDirectoryAndParents();
retryService = MoreExecutors.listeningDecorator(Executors.newScheduledThreadPool(1));
}
@After
public void tearDown() throws Exception {
retryService.shutdownNow();
retryService.awaitTermination(
com.google.devtools.build.lib.testutil.TestUtils.WAIT_TIMEOUT_SECONDS, TimeUnit.SECONDS);
server.shutdownNow();
server.awaitTermination();
}
@Before
public void setup() {
MockitoAnnotations.initMocks(this);
}
@Test
public void uploadsShouldWork() throws Exception {
int numUploads = 2;
Map<HashCode, byte[]> blobsByHash = new HashMap<>();
Map<Path, LocalFile> filesToUpload = new HashMap<>();
Random rand = new Random();
for (int i = 0; i < numUploads; i++) {
Path file = fs.getPath("/file" + i);
OutputStream out = file.getOutputStream();
int blobSize = rand.nextInt(100) + 1;
byte[] blob = new byte[blobSize];
rand.nextBytes(blob);
out.write(blob);
out.close();
blobsByHash.put(HashCode.fromString(DIGEST_UTIL.compute(file).getHash()), blob);
filesToUpload.put(
file,
new LocalFile(
file, LocalFileType.OUTPUT, /*artifact=*/ null, /*artifactMetadata=*/ null));
}
serviceRegistry.addService(new MaybeFailOnceUploadService(blobsByHash));
RemoteRetrier retrier =
TestUtils.newRemoteRetrier(() -> new FixedBackoff(1, 0), (e) -> true, retryService);
ReferenceCountedChannel refCntChannel = new ReferenceCountedChannel(channelConnectionFactory);
RemoteCache remoteCache = newRemoteCache(refCntChannel, retrier);
ByteStreamBuildEventArtifactUploader artifactUploader = newArtifactUploader(remoteCache);
PathConverter pathConverter = artifactUploader.upload(filesToUpload).get();
for (Path file : filesToUpload.keySet()) {
String hash = BaseEncoding.base16().lowerCase().encode(file.getDigest());
long size = file.getFileSize();
String conversion = pathConverter.apply(file);
assertThat(conversion)
.isEqualTo("bytestream://localhost/instance/blobs/" + hash + "/" + size);
}
artifactUploader.release();
assertThat(remoteCache.refCnt()).isEqualTo(0);
assertThat(refCntChannel.isShutdown()).isTrue();
}
@Test
public void uploadsShouldWork_fewerPermitsThanUploads() throws Exception {
int numUploads = 2;
Map<HashCode, byte[]> blobsByHash = new HashMap<>();
Map<Path, LocalFile> filesToUpload = new HashMap<>();
Random rand = new Random();
for (int i = 0; i < numUploads; i++) {
Path file = fs.getPath("/file" + i);
OutputStream out = file.getOutputStream();
int blobSize = rand.nextInt(100) + 1;
byte[] blob = new byte[blobSize];
rand.nextBytes(blob);
out.write(blob);
out.close();
blobsByHash.put(HashCode.fromString(DIGEST_UTIL.compute(file).getHash()), blob);
filesToUpload.put(
file,
new LocalFile(
file, LocalFileType.OUTPUT, /*artifact=*/ null, /*artifactMetadata=*/ null));
}
serviceRegistry.addService(new MaybeFailOnceUploadService(blobsByHash));
RemoteRetrier retrier =
TestUtils.newRemoteRetrier(() -> new FixedBackoff(1, 0), (e) -> true, retryService);
ReferenceCountedChannel refCntChannel = new ReferenceCountedChannel(channelConnectionFactory);
// number of permits is less than number of uploads to affirm permit is released
RemoteCache remoteCache = newRemoteCache(refCntChannel, retrier);
ByteStreamBuildEventArtifactUploader artifactUploader = newArtifactUploader(remoteCache);
PathConverter pathConverter = artifactUploader.upload(filesToUpload).get();
for (Path file : filesToUpload.keySet()) {
String hash = BaseEncoding.base16().lowerCase().encode(file.getDigest());
long size = file.getFileSize();
String conversion = pathConverter.apply(file);
assertThat(conversion)
.isEqualTo("bytestream://localhost/instance/blobs/" + hash + "/" + size);
}
artifactUploader.release();
assertThat(remoteCache.refCnt()).isEqualTo(0);
assertThat(refCntChannel.isShutdown()).isTrue();
}
@Test
public void testUploadDirectoryDoesNotCrash() throws Exception {
Path dir = fs.getPath("/dir");
dir.createDirectoryAndParents();
Map<Path, LocalFile> filesToUpload = new HashMap<>();
filesToUpload.put(
dir,
new LocalFile(dir, LocalFileType.OUTPUT, /*artifact=*/ null, /*artifactMetadata=*/ null));
RemoteRetrier retrier =
TestUtils.newRemoteRetrier(() -> new FixedBackoff(1, 0), (e) -> true, retryService);
ReferenceCountedChannel refCntChannel = new ReferenceCountedChannel(channelConnectionFactory);
RemoteCache remoteCache = newRemoteCache(refCntChannel, retrier);
ByteStreamBuildEventArtifactUploader artifactUploader = newArtifactUploader(remoteCache);
PathConverter pathConverter = artifactUploader.upload(filesToUpload).get();
assertThat(pathConverter.apply(dir)).isNull();
artifactUploader.release();
}
@Test
public void someUploadsFail_succeedsWithWarningMessages() throws Exception {
// Test that if one of multiple file uploads fails, the upload future succeeds but the
// error is reported correctly.
int numUploads = 10;
Map<HashCode, byte[]> blobsByHash = new HashMap<>();
Map<Path, LocalFile> filesToUpload = new HashMap<>();
Random rand = new Random();
for (int i = 0; i < numUploads; i++) {
Path file = fs.getPath("/file" + i);
OutputStream out = file.getOutputStream();
int blobSize = rand.nextInt(100) + 1;
byte[] blob = new byte[blobSize];
rand.nextBytes(blob);
out.write(blob);
out.flush();
out.close();
blobsByHash.put(HashCode.fromString(DIGEST_UTIL.compute(file).getHash()), blob);
filesToUpload.put(
file,
new LocalFile(
file, LocalFileType.OUTPUT, /*artifact=*/ null, /*artifactMetadata=*/ null));
}
String hashOfBlobThatShouldFail = blobsByHash.keySet().iterator().next().toString();
serviceRegistry.addService(
new MaybeFailOnceUploadService(blobsByHash) {
@Override
public StreamObserver<WriteRequest> write(StreamObserver<WriteResponse> response) {
StreamObserver<WriteRequest> delegate = super.write(response);
return new StreamObserver<WriteRequest>() {
private boolean failed;
@Override
public void onNext(WriteRequest value) {
if (value.getResourceName().contains(hashOfBlobThatShouldFail)) {
response.onError(Status.CANCELLED.asException());
failed = true;
} else {
delegate.onNext(value);
}
}
@Override
public void onError(Throwable t) {
delegate.onError(t);
}
@Override
public void onCompleted() {
if (failed) {
return;
}
delegate.onCompleted();
}
};
}
});
RemoteRetrier retrier =
TestUtils.newRemoteRetrier(() -> new FixedBackoff(1, 0), (e) -> true, retryService);
ReferenceCountedChannel refCntChannel = new ReferenceCountedChannel(channelConnectionFactory);
RemoteCache remoteCache = newRemoteCache(refCntChannel, retrier);
ByteStreamBuildEventArtifactUploader artifactUploader = newArtifactUploader(remoteCache);
artifactUploader.upload(filesToUpload).get();
assertThat(eventHandler.getEvents()).isNotEmpty();
assertThat(eventHandler.getEvents().get(0).getMessage())
.contains("Uploading BEP referenced local file /file");
artifactUploader.release();
assertThat(remoteCache.refCnt()).isEqualTo(0);
assertThat(refCntChannel.isShutdown()).isTrue();
}
@Test
public void remoteFileShouldNotBeUploaded_actionFs() throws Exception {
// Test that we don't attempt to upload remotely stored file but convert the remote path
// to a bytestream:// URI.
// arrange
RemoteRetrier retrier =
TestUtils.newRemoteRetrier(() -> new FixedBackoff(1, 0), (e) -> true, retryService);
ReferenceCountedChannel refCntChannel = new ReferenceCountedChannel(channelConnectionFactory);
RemoteCache remoteCache = spy(newRemoteCache(refCntChannel, retrier));
RemoteActionInputFetcher actionInputFetcher = mock(RemoteActionInputFetcher.class);
ByteStreamBuildEventArtifactUploader artifactUploader = newArtifactUploader(remoteCache);
ActionInputMap outputs = new ActionInputMap(2);
Artifact artifact = createRemoteArtifact("file1.txt", "foo", outputs);
RemoteActionFileSystem remoteFs =
new RemoteActionFileSystem(
fs,
execRoot.asFragment(),
outputRoot.getRoot().asPath().relativeTo(execRoot).getPathString(),
outputs,
ImmutableList.of(artifact),
StaticInputMetadataProvider.empty(),
actionInputFetcher);
Path remotePath = remoteFs.getPath(artifact.getPath().getPathString());
assertThat(remotePath.getFileSystem()).isEqualTo(remoteFs);
LocalFile file =
new LocalFile(
remotePath, LocalFileType.OUTPUT, /*artifact=*/ null, /*artifactMetadata=*/ null);
// act
PathConverter pathConverter = artifactUploader.upload(ImmutableMap.of(remotePath, file)).get();
FileArtifactValue metadata = outputs.getInputMetadata(artifact);
Digest digest = DigestUtil.buildDigest(metadata.getDigest(), metadata.getSize());
// assert
String conversion = pathConverter.apply(remotePath);
assertThat(conversion)
.isEqualTo(
"bytestream://localhost/instance/blobs/"
+ digest.getHash()
+ "/"
+ digest.getSizeBytes());
verify(remoteCache, times(0)).uploadFile(any(), any(), any());
verify(remoteCache, times(0)).uploadBlob(any(), any(), any());
}
@Test
public void remoteFileShouldNotBeUploaded_findMissingDigests() throws Exception {
// Test that findMissingDigests is called to check which files exist remotely
// and that those are not uploaded.
// arrange
Path remoteFile = fs.getPath("/remote-file");
FileSystemUtils.writeContent(remoteFile, StandardCharsets.UTF_8, "hello world");
Digest remoteDigest = DIGEST_UTIL.compute(remoteFile);
Path localFile = fs.getPath("/local-file");
FileSystemUtils.writeContent(localFile, StandardCharsets.UTF_8, "foo bar");
Digest localDigest = DIGEST_UTIL.compute(localFile);
StaticMissingDigestsFinder digestQuerier =
Mockito.spy(new StaticMissingDigestsFinder(ImmutableSet.of(remoteDigest)));
RemoteRetrier retrier =
TestUtils.newRemoteRetrier(() -> new FixedBackoff(1, 0), (e) -> true, retryService);
ReferenceCountedChannel refCntChannel = new ReferenceCountedChannel(channelConnectionFactory);
RemoteCache remoteCache = spy(newRemoteCache(refCntChannel, retrier, digestQuerier));
doAnswer(invocationOnMock -> Futures.immediateFuture(null))
.when(remoteCache)
.uploadFile(any(), any(), any());
ByteStreamBuildEventArtifactUploader artifactUploader = newArtifactUploader(remoteCache);
// act
Map<Path, LocalFile> files =
ImmutableMap.of(
remoteFile,
new LocalFile(
remoteFile, LocalFileType.OUTPUT, /*artifact=*/ null, /*artifactMetadata=*/ null),
localFile,
new LocalFile(
localFile, LocalFileType.OUTPUT, /*artifact=*/ null, /*artifactMetadata=*/ null));
PathConverter pathConverter = artifactUploader.upload(files).get();
// assert
verify(digestQuerier).findMissingDigests(any(), any());
verify(remoteCache).uploadFile(any(), eq(localDigest), any());
assertThat(pathConverter.apply(remoteFile)).contains(remoteDigest.getHash());
assertThat(pathConverter.apply(localFile)).contains(localDigest.getHash());
}
/** Returns a remote artifact and puts its metadata into the action input map. */
private Artifact createRemoteArtifact(
String pathFragment, String contents, ActionInputMap inputs) {
Path p = outputRoot.getRoot().asPath().getRelative(pathFragment);
Artifact a = ActionsTestUtil.createArtifact(outputRoot, p);
byte[] b = contents.getBytes(StandardCharsets.UTF_8);
HashCode h = HashCode.fromString(DIGEST_UTIL.compute(b).getHash());
FileArtifactValue f =
RemoteFileArtifactValue.create(
h.asBytes(), b.length, /* locationIndex= */ 1, /* expireAtEpochMilli= */ -1);
inputs.putWithNoDepOwner(a, f);
return a;
}
private RemoteCache newRemoteCache(ReferenceCountedChannel channel, RemoteRetrier retrier) {
return newRemoteCache(channel, retrier, new AllMissingDigestsFinder());
}
private RemoteCache newRemoteCache(
ReferenceCountedChannel channel,
RemoteRetrier retrier,
MissingDigestsFinder missingDigestsFinder) {
RemoteOptions remoteOptions = Options.getDefaults(RemoteOptions.class);
remoteOptions.remoteInstanceName = "instance";
GrpcCacheClient cacheClient =
spy(
new GrpcCacheClient(
channel,
CallCredentialsProvider.NO_CREDENTIALS,
remoteOptions,
retrier,
DIGEST_UTIL));
doAnswer(
invocationOnMock ->
missingDigestsFinder.findMissingDigests(
invocationOnMock.getArgument(0), invocationOnMock.getArgument(1)))
.when(cacheClient)
.findMissingDigests(any(), any());
return new RemoteCache(
CacheCapabilities.getDefaultInstance(), cacheClient, remoteOptions, DIGEST_UTIL);
}
private ByteStreamBuildEventArtifactUploader newArtifactUploader(RemoteCache remoteCache) {
return new ByteStreamBuildEventArtifactUploader(
MoreExecutors.directExecutor(),
reporter,
/*verboseFailures=*/ true,
remoteCache,
/*remoteServerInstanceName=*/ "localhost/instance",
/*buildRequestId=*/ "none",
/*commandId=*/ "none",
SyscallCache.NO_CACHE,
RemoteBuildEventUploadMode.ALL);
}
private static class StaticMissingDigestsFinder implements MissingDigestsFinder {
private final ImmutableSet<Digest> knownDigests;
public StaticMissingDigestsFinder(ImmutableSet<Digest> knownDigests) {
this.knownDigests = knownDigests;
}
@Override
public ListenableFuture<ImmutableSet<Digest>> findMissingDigests(
RemoteActionExecutionContext context, Iterable<Digest> digests) {
ImmutableSet.Builder<Digest> missingDigests = ImmutableSet.builder();
for (Digest digest : digests) {
if (!knownDigests.contains(digest)) {
missingDigests.add(digest);
}
}
return Futures.immediateFuture(missingDigests.build());
}
}
private static class AllMissingDigestsFinder implements MissingDigestsFinder {
@Override
public ListenableFuture<ImmutableSet<Digest>> findMissingDigests(
RemoteActionExecutionContext context, Iterable<Digest> digests) {
return Futures.immediateFuture(ImmutableSet.copyOf(digests));
}
}
}