blob: 8925640c11ccbc76fb7a0a2eb5664f7c1847b97b [file] [log] [blame]
// Copyright 2019 The Bazel Authors. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package com.google.devtools.build.lib.remote.util;
import build.bazel.remote.execution.v2.ActionResult;
import build.bazel.remote.execution.v2.Digest;
import com.google.common.collect.ImmutableSet;
import com.google.common.io.ByteStreams;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.devtools.build.lib.remote.common.CacheNotFoundException;
import com.google.devtools.build.lib.remote.common.RemoteActionExecutionContext;
import com.google.devtools.build.lib.remote.common.RemoteCacheClient;
import com.google.devtools.build.lib.vfs.Path;
import com.google.protobuf.ByteString;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.util.AbstractMap.SimpleEntry;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
/** A {@link RemoteCacheClient} that stores its contents in memory. */
public final class InMemoryCacheClient implements RemoteCacheClient {
private final ListeningExecutorService executorService =
MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(100));
private final ConcurrentMap<Digest, Exception> downloadFailures = new ConcurrentHashMap<>();
private final ConcurrentMap<ActionKey, ActionResult> ac = new ConcurrentHashMap<>();
private final ConcurrentMap<Digest, byte[]> cas;
private AtomicInteger numSuccess = new AtomicInteger();
private AtomicInteger numFailures = new AtomicInteger();
private final ConcurrentMap<Digest, AtomicInteger> numFindMissingDigests =
new ConcurrentHashMap<>();
public InMemoryCacheClient(Map<Digest, byte[]> casEntries) {
this.cas = new ConcurrentHashMap<>();
for (Map.Entry<Digest, byte[]> entry : casEntries.entrySet()) {
cas.put(entry.getKey(), entry.getValue());
}
}
public InMemoryCacheClient() {
this.cas = new ConcurrentHashMap<>();
}
public void addDownloadFailure(Digest digest, Exception e) {
downloadFailures.put(digest, e);
}
public int getNumSuccessfulDownloads() {
return numSuccess.get();
}
public int getNumFailedDownloads() {
return numFailures.get();
}
public Map<Digest, Integer> getNumFindMissingDigests() {
return numFindMissingDigests.entrySet().stream()
.map(entry -> new SimpleEntry<>(entry.getKey(), entry.getValue().get()))
.collect(Collectors.toMap(SimpleEntry::getKey, SimpleEntry::getValue));
}
@Override
public ListenableFuture<Void> downloadBlob(
RemoteActionExecutionContext context, Digest digest, OutputStream out) {
Exception failure = downloadFailures.get(digest);
if (failure != null) {
numFailures.incrementAndGet();
return Futures.immediateFailedFuture(failure);
}
byte[] data = cas.get(digest);
if (data == null) {
return Futures.immediateFailedFuture(new CacheNotFoundException(digest));
}
try {
out.write(data);
out.flush();
} catch (IOException e) {
numFailures.incrementAndGet();
return Futures.immediateFailedFuture(e);
}
numSuccess.incrementAndGet();
return Futures.immediateFuture(null);
}
@Override
public ListenableFuture<CachedActionResult> downloadActionResult(
RemoteActionExecutionContext context, ActionKey actionKey, boolean inlineOutErr) {
ActionResult actionResult = ac.get(actionKey);
if (actionResult == null) {
return Futures.immediateFailedFuture(new CacheNotFoundException(actionKey.getDigest()));
}
return Futures.immediateFuture(CachedActionResult.remote(actionResult));
}
@Override
public ListenableFuture<Void> uploadActionResult(
RemoteActionExecutionContext context, ActionKey actionKey, ActionResult actionResult) {
ac.put(actionKey, actionResult);
return Futures.immediateFuture(null);
}
@Override
public ListenableFuture<Void> uploadFile(
RemoteActionExecutionContext context, Digest digest, Path file) {
try (InputStream in = file.getInputStream()) {
cas.put(digest, ByteStreams.toByteArray(in));
} catch (IOException e) {
return Futures.immediateFailedFuture(e);
}
return Futures.immediateFuture(null);
}
@Override
public ListenableFuture<Void> uploadBlob(
RemoteActionExecutionContext context, Digest digest, ByteString data) {
try (InputStream in = data.newInput()) {
cas.put(digest, data.toByteArray());
} catch (IOException e) {
return Futures.immediateFailedFuture(e);
}
return Futures.immediateFuture(null);
}
@Override
public ListenableFuture<ImmutableSet<Digest>> findMissingDigests(
RemoteActionExecutionContext context, Iterable<Digest> digests) {
return executorService.submit(
() -> {
ImmutableSet.Builder<Digest> missingBuilder = ImmutableSet.builder();
for (Digest digest : digests) {
numFindMissingDigests
.computeIfAbsent(digest, (key) -> new AtomicInteger(0))
.incrementAndGet();
if (!cas.containsKey(digest)) {
missingBuilder.add(digest);
}
}
return missingBuilder.build();
});
}
@Override
public void close() {
cas.clear();
ac.clear();
}
}