blob: 4fc872325f9ca4a205f06c71da323198a27b8511 [file] [log] [blame]
// Copyright 2019 The Bazel Authors. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package com.google.devtools.build.lib.remote.blobstore;
import com.google.common.base.Preconditions;
import com.google.common.io.ByteStreams;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.common.util.concurrent.SettableFuture;
import com.google.devtools.build.lib.vfs.Path;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.util.UUID;
import java.util.logging.Level;
import java.util.logging.Logger;
/**
* A {@link SimpleBlobStore} implementation combining two blob stores. A local disk blob store and a
* remote http blob store. If a blob isn't found in the first store, the second store is used, and
* the blob added to the first. Put puts the blob on both stores.
*/
public final class CombinedDiskHttpBlobStore extends OnDiskBlobStore {
private static final Logger logger = Logger.getLogger(CombinedDiskHttpBlobStore.class.getName());
private final SimpleBlobStore bsHttp;
public CombinedDiskHttpBlobStore(Path root, SimpleBlobStore bsHttp) {
super(root);
this.bsHttp = Preconditions.checkNotNull(bsHttp);
}
@Override
public boolean containsKey(String key) {
// HTTP cache does not support containsKey.
// Don't support it here either for predictable semantics.
throw new UnsupportedOperationException("HTTP Caching does not use this method.");
}
@Override
public ListenableFuture<Boolean> get(String key, OutputStream out) {
boolean foundOnDisk = super.containsKey(key);
if (foundOnDisk) {
return super.get(key, out);
} else {
// Write a temporary file first, and then rename, to avoid data corruption in case of a crash.
Path temp = toPath(UUID.randomUUID().toString());
OutputStream tempOut;
try {
tempOut = temp.getOutputStream();
} catch (IOException e) {
return Futures.immediateFailedFuture(e);
}
ListenableFuture<Boolean> chained =
Futures.transformAsync(
bsHttp.get(key, tempOut),
(found) -> {
if (!found) {
return Futures.immediateFuture(false);
} else {
Path target = toPath(key);
// The following note and line is taken from OnDiskBlobStore.java
// TODO(ulfjack): Fsync temp here before we rename it to avoid data loss in the
// case of machine
// crashes (the OS may reorder the writes and the rename).
temp.renameTo(target);
SettableFuture<Boolean> f = SettableFuture.create();
try (InputStream in = target.getInputStream()) {
ByteStreams.copy(in, out);
f.set(true);
} catch (IOException e) {
f.setException(e);
}
return f;
}
},
MoreExecutors.directExecutor());
chained.addListener(
() -> {
try {
tempOut.close();
} catch (IOException e) {
// not sure what to do here, we either are here because of another exception being
// thrown,
// or we have successfully used the file we are trying (and failing) to close
logger.log(Level.WARNING, "Failed to close temporary file on get", e);
}
},
MoreExecutors.directExecutor());
return chained;
}
}
@Override
public boolean getActionResult(String key, OutputStream out)
throws IOException, InterruptedException {
if (super.getActionResult(key, out)) {
return true;
}
try (ByteArrayOutputStream tmpOut = new ByteArrayOutputStream()) {
if (bsHttp.getActionResult(key, tmpOut)) {
byte[] tmp = tmpOut.toByteArray();
super.putActionResult(key, tmp);
ByteStreams.copy(new ByteArrayInputStream(tmp), out);
return true;
}
}
return false;
}
@Override
public void put(String key, long length, InputStream in)
throws IOException, InterruptedException {
super.put(key, length, in);
try (InputStream inFile = toPath(key).getInputStream()) {
bsHttp.put(key, length, inFile);
}
}
@Override
public void putActionResult(String key, byte[] in) throws IOException, InterruptedException {
super.putActionResult(key, in);
bsHttp.putActionResult(key, in);
}
@Override
public void close() {
super.close();
bsHttp.close();
}
}