blob: 0c407a29b7beed146119c3676d5a91369d8ab116 [file] [log] [blame]
// Copyright 2016 The Bazel Authors. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package com.google.devtools.build.lib.remote;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Predicate;
import com.google.common.collect.Iterators;
import com.google.devtools.build.lib.actions.ActionInput;
import com.google.devtools.build.lib.actions.ActionInputFileCache;
import com.google.devtools.build.lib.actions.cache.VirtualActionInput;
import com.google.devtools.build.lib.util.Preconditions;
import com.google.devtools.build.lib.vfs.Path;
import com.google.devtools.remoteexecution.v1test.Digest;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Iterator;
import java.util.NoSuchElementException;
import java.util.Objects;
import java.util.Set;
/** An iterator-type object that transforms byte sources into a stream of Chunks. */
public final class Chunker {
// This is effectively final, should be changed only in unit-tests!
private static int DEFAULT_CHUNK_SIZE = 1024 * 16;
private static byte[] EMPTY_BLOB = new byte[0];
@VisibleForTesting
static void setDefaultChunkSizeForTesting(int value) {
DEFAULT_CHUNK_SIZE = value;
}
public static int getDefaultChunkSize() {
return DEFAULT_CHUNK_SIZE;
}
/** A piece of a byte[] blob. */
public static final class Chunk {
private final Digest digest;
private final long offset;
// TODO(olaola): consider saving data in a different format that byte[].
private final byte[] data;
@VisibleForTesting
public Chunk(Digest digest, byte[] data, long offset) {
this.digest = digest;
this.data = data;
this.offset = offset;
}
@Override
public boolean equals(Object o) {
if (o == this) {
return true;
}
if (!(o instanceof Chunk)) {
return false;
}
Chunk other = (Chunk) o;
return other.offset == offset
&& other.digest.equals(digest)
&& Arrays.equals(other.data, data);
}
@Override
public int hashCode() {
return Objects.hash(digest, offset, Arrays.hashCode(data));
}
public Digest getDigest() {
return digest;
}
public long getOffset() {
return offset;
}
// This returns a mutable copy, for efficiency.
public byte[] getData() {
return data;
}
}
/** An Item is an opaque digestable source of bytes. */
interface Item {
Digest getDigest() throws IOException;
InputStream getInputStream() throws IOException;
}
private final Iterator<Item> inputIterator;
private InputStream currentStream;
private Digest digest;
private long bytesLeft;
private final int chunkSize;
Chunker(Iterator<Item> inputIterator, int chunkSize) throws IOException {
Preconditions.checkArgument(chunkSize > 0, "Chunk size must be greater than 0");
this.inputIterator = inputIterator;
this.chunkSize = chunkSize;
advanceInput();
}
Chunker(Item input, int chunkSize) throws IOException {
this(Iterators.singletonIterator(input), chunkSize);
}
public void advanceInput() throws IOException {
if (inputIterator != null && inputIterator.hasNext()) {
Item input = inputIterator.next();
digest = input.getDigest();
currentStream = input.getInputStream();
bytesLeft = digest.getSizeBytes();
} else {
digest = null;
currentStream = null;
bytesLeft = 0;
}
}
/** True if the object has more Chunk elements. */
public boolean hasNext() {
return currentStream != null;
}
/** Consume the next Chunk element. */
public Chunk next() throws IOException {
if (!hasNext()) {
throw new NoSuchElementException();
}
final long offset = digest.getSizeBytes() - bytesLeft;
byte[] blob = EMPTY_BLOB;
if (bytesLeft > 0) {
blob = new byte[(int) Math.min(bytesLeft, chunkSize)];
currentStream.read(blob);
bytesLeft -= blob.length;
}
final Chunk result = new Chunk(digest, blob, offset);
if (bytesLeft == 0) {
currentStream.close();
advanceInput(); // Sets the current stream to null, if it was the last.
}
return result;
}
static Item toItem(final byte[] blob) {
return new Item() {
Digest digest = null;
@Override
public Digest getDigest() throws IOException {
if (digest == null) {
digest = Digests.computeDigest(blob);
}
return digest;
}
@Override
public InputStream getInputStream() throws IOException {
return new ByteArrayInputStream(blob);
}
};
}
static Item toItem(final Path file) {
return new Item() {
Digest digest = null;
@Override
public Digest getDigest() throws IOException {
if (digest == null) {
digest = Digests.computeDigest(file);
}
return digest;
}
@Override
public InputStream getInputStream() throws IOException {
return file.getInputStream();
}
};
}
static Item toItem(
final ActionInput input, final ActionInputFileCache inputCache, final Path execRoot) {
if (input instanceof VirtualActionInput) {
return toItem((VirtualActionInput) input);
}
return new Item() {
@Override
public Digest getDigest() throws IOException {
return Digests.getDigestFromInputCache(input, inputCache);
}
@Override
public InputStream getInputStream() throws IOException {
return execRoot.getRelative(input.getExecPathString()).getInputStream();
}
};
}
static Item toItem(final VirtualActionInput input) {
return new Item() {
Digest digest = null;
@Override
public Digest getDigest() throws IOException {
if (digest == null) {
digest = Digests.computeDigest(input);
}
return digest;
}
@Override
public InputStream getInputStream() throws IOException {
ByteArrayOutputStream buffer = new ByteArrayOutputStream();
input.writeTo(buffer);
return new ByteArrayInputStream(buffer.toByteArray());
}
};
}
/**
* Create a Chunker from a given ActionInput, taking its digest from the provided
* ActionInputFileCache.
*/
public static Chunker from(
ActionInput input, int chunkSize, ActionInputFileCache inputCache, Path execRoot)
throws IOException {
return new Chunker(toItem(input, inputCache, execRoot), chunkSize);
}
/**
* Create a Chunker from a given ActionInput, taking its digest from the provided
* ActionInputFileCache.
*/
public static Chunker from(ActionInput input, ActionInputFileCache inputCache, Path execRoot)
throws IOException {
return from(input, getDefaultChunkSize(), inputCache, execRoot);
}
/** Create a Chunker from a given blob and chunkSize. */
public static Chunker from(byte[] blob, int chunkSize) throws IOException {
return new Chunker(toItem(blob), chunkSize);
}
/** Create a Chunker from a given blob. */
public static Chunker from(byte[] blob) throws IOException {
return from(blob, getDefaultChunkSize());
}
/** Create a Chunker from a given Path and chunkSize. */
public static Chunker from(Path file, int chunkSize) throws IOException {
return new Chunker(toItem(file), chunkSize);
}
/** Create a Chunker from a given Path. */
public static Chunker from(Path file) throws IOException {
return from(file, getDefaultChunkSize());
}
static class MemberOf implements Predicate<Item> {
private final Set<Digest> digests;
public MemberOf(Set<Digest> digests) {
this.digests = digests;
}
@Override
public boolean apply(Item item) {
try {
return digests.contains(item.getDigest());
} catch (IOException e) {
throw new RuntimeException(e);
}
}
}
/**
* Create a Chunker from multiple input sources. The order of the sources provided to the Builder
* will be the same order they will be chunked by.
*/
public static final class Builder {
private final ArrayList<Item> items = new ArrayList<>();
private Set<Digest> digests = null;
private int chunkSize = getDefaultChunkSize();
public Chunker build() throws IOException {
return new Chunker(
digests == null
? items.iterator()
: Iterators.filter(items.iterator(), new MemberOf(digests)),
chunkSize);
}
public Builder chunkSize(int chunkSize) {
this.chunkSize = chunkSize;
return this;
}
/**
* Restricts the Chunker to use only inputs with these digests. This is an optimization for CAS
* uploads where a list of digests missing from the CAS is known.
*/
public Builder onlyUseDigests(Set<Digest> digests) {
this.digests = digests;
return this;
}
public Builder addInput(byte[] blob) {
items.add(toItem(blob));
return this;
}
public Builder addInput(Path file) {
items.add(toItem(file));
return this;
}
public Builder addInput(ActionInput input, ActionInputFileCache inputCache, Path execRoot) {
items.add(toItem(input, inputCache, execRoot));
return this;
}
public Builder addAllInputs(
Collection<? extends ActionInput> inputs, ActionInputFileCache inputCache, Path execRoot) {
for (ActionInput input : inputs) {
items.add(toItem(input, inputCache, execRoot));
}
return this;
}
}
}