| // Copyright 2016 The Bazel Authors. All rights reserved. |
| // |
| // Licensed under the Apache License, Version 2.0 (the "License"); |
| // you may not use this file except in compliance with the License. |
| // You may obtain a copy of the License at |
| // |
| // http://www.apache.org/licenses/LICENSE-2.0 |
| // |
| // Unless required by applicable law or agreed to in writing, software |
| // distributed under the License is distributed on an "AS IS" BASIS, |
| // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| // See the License for the specific language governing permissions and |
| // limitations under the License. |
| |
| package com.google.devtools.build.lib.remote; |
| |
| import com.google.common.annotations.VisibleForTesting; |
| import com.google.common.base.Predicate; |
| import com.google.common.collect.Iterators; |
| import com.google.devtools.build.lib.actions.ActionInput; |
| import com.google.devtools.build.lib.actions.ActionInputFileCache; |
| import com.google.devtools.build.lib.actions.cache.VirtualActionInput; |
| import com.google.devtools.build.lib.util.Preconditions; |
| import com.google.devtools.build.lib.vfs.Path; |
| import com.google.devtools.remoteexecution.v1test.Digest; |
| import java.io.ByteArrayInputStream; |
| import java.io.ByteArrayOutputStream; |
| import java.io.IOException; |
| import java.io.InputStream; |
| import java.util.ArrayList; |
| import java.util.Arrays; |
| import java.util.Collection; |
| import java.util.Iterator; |
| import java.util.NoSuchElementException; |
| import java.util.Objects; |
| import java.util.Set; |
| |
| /** An iterator-type object that transforms byte sources into a stream of Chunks. */ |
| public final class Chunker { |
| // This is effectively final, should be changed only in unit-tests! |
| private static int DEFAULT_CHUNK_SIZE = 1024 * 16; |
| private static byte[] EMPTY_BLOB = new byte[0]; |
| |
| @VisibleForTesting |
| static void setDefaultChunkSizeForTesting(int value) { |
| DEFAULT_CHUNK_SIZE = value; |
| } |
| |
| public static int getDefaultChunkSize() { |
| return DEFAULT_CHUNK_SIZE; |
| } |
| |
| /** A piece of a byte[] blob. */ |
| public static final class Chunk { |
| |
| private final Digest digest; |
| private final long offset; |
| // TODO(olaola): consider saving data in a different format that byte[]. |
| private final byte[] data; |
| |
| @VisibleForTesting |
| public Chunk(Digest digest, byte[] data, long offset) { |
| this.digest = digest; |
| this.data = data; |
| this.offset = offset; |
| } |
| |
| @Override |
| public boolean equals(Object o) { |
| if (o == this) { |
| return true; |
| } |
| if (!(o instanceof Chunk)) { |
| return false; |
| } |
| Chunk other = (Chunk) o; |
| return other.offset == offset |
| && other.digest.equals(digest) |
| && Arrays.equals(other.data, data); |
| } |
| |
| @Override |
| public int hashCode() { |
| return Objects.hash(digest, offset, Arrays.hashCode(data)); |
| } |
| |
| public Digest getDigest() { |
| return digest; |
| } |
| |
| public long getOffset() { |
| return offset; |
| } |
| |
| // This returns a mutable copy, for efficiency. |
| public byte[] getData() { |
| return data; |
| } |
| } |
| |
| /** An Item is an opaque digestable source of bytes. */ |
| interface Item { |
| Digest getDigest() throws IOException; |
| |
| InputStream getInputStream() throws IOException; |
| } |
| |
| private final Iterator<Item> inputIterator; |
| private InputStream currentStream; |
| private Digest digest; |
| private long bytesLeft; |
| private final int chunkSize; |
| |
| Chunker(Iterator<Item> inputIterator, int chunkSize) throws IOException { |
| Preconditions.checkArgument(chunkSize > 0, "Chunk size must be greater than 0"); |
| this.inputIterator = inputIterator; |
| this.chunkSize = chunkSize; |
| advanceInput(); |
| } |
| |
| Chunker(Item input, int chunkSize) throws IOException { |
| this(Iterators.singletonIterator(input), chunkSize); |
| } |
| |
| public void advanceInput() throws IOException { |
| if (inputIterator != null && inputIterator.hasNext()) { |
| Item input = inputIterator.next(); |
| digest = input.getDigest(); |
| currentStream = input.getInputStream(); |
| bytesLeft = digest.getSizeBytes(); |
| } else { |
| digest = null; |
| currentStream = null; |
| bytesLeft = 0; |
| } |
| } |
| |
| /** True if the object has more Chunk elements. */ |
| public boolean hasNext() { |
| return currentStream != null; |
| } |
| |
| /** Consume the next Chunk element. */ |
| public Chunk next() throws IOException { |
| if (!hasNext()) { |
| throw new NoSuchElementException(); |
| } |
| final long offset = digest.getSizeBytes() - bytesLeft; |
| byte[] blob = EMPTY_BLOB; |
| if (bytesLeft > 0) { |
| blob = new byte[(int) Math.min(bytesLeft, chunkSize)]; |
| currentStream.read(blob); |
| bytesLeft -= blob.length; |
| } |
| final Chunk result = new Chunk(digest, blob, offset); |
| if (bytesLeft == 0) { |
| currentStream.close(); |
| advanceInput(); // Sets the current stream to null, if it was the last. |
| } |
| return result; |
| } |
| |
| static Item toItem(final byte[] blob) { |
| return new Item() { |
| Digest digest = null; |
| |
| @Override |
| public Digest getDigest() throws IOException { |
| if (digest == null) { |
| digest = Digests.computeDigest(blob); |
| } |
| return digest; |
| } |
| |
| @Override |
| public InputStream getInputStream() throws IOException { |
| return new ByteArrayInputStream(blob); |
| } |
| }; |
| } |
| |
| static Item toItem(final Path file) { |
| return new Item() { |
| Digest digest = null; |
| |
| @Override |
| public Digest getDigest() throws IOException { |
| if (digest == null) { |
| digest = Digests.computeDigest(file); |
| } |
| return digest; |
| } |
| |
| @Override |
| public InputStream getInputStream() throws IOException { |
| return file.getInputStream(); |
| } |
| }; |
| } |
| |
| static Item toItem( |
| final ActionInput input, final ActionInputFileCache inputCache, final Path execRoot) { |
| if (input instanceof VirtualActionInput) { |
| return toItem((VirtualActionInput) input); |
| } |
| return new Item() { |
| @Override |
| public Digest getDigest() throws IOException { |
| return Digests.getDigestFromInputCache(input, inputCache); |
| } |
| |
| @Override |
| public InputStream getInputStream() throws IOException { |
| return execRoot.getRelative(input.getExecPathString()).getInputStream(); |
| } |
| }; |
| } |
| |
| static Item toItem(final VirtualActionInput input) { |
| return new Item() { |
| Digest digest = null; |
| |
| @Override |
| public Digest getDigest() throws IOException { |
| if (digest == null) { |
| digest = Digests.computeDigest(input); |
| } |
| return digest; |
| } |
| |
| @Override |
| public InputStream getInputStream() throws IOException { |
| ByteArrayOutputStream buffer = new ByteArrayOutputStream(); |
| input.writeTo(buffer); |
| return new ByteArrayInputStream(buffer.toByteArray()); |
| } |
| }; |
| } |
| |
| /** |
| * Create a Chunker from a given ActionInput, taking its digest from the provided |
| * ActionInputFileCache. |
| */ |
| public static Chunker from( |
| ActionInput input, int chunkSize, ActionInputFileCache inputCache, Path execRoot) |
| throws IOException { |
| return new Chunker(toItem(input, inputCache, execRoot), chunkSize); |
| } |
| |
| /** |
| * Create a Chunker from a given ActionInput, taking its digest from the provided |
| * ActionInputFileCache. |
| */ |
| public static Chunker from(ActionInput input, ActionInputFileCache inputCache, Path execRoot) |
| throws IOException { |
| return from(input, getDefaultChunkSize(), inputCache, execRoot); |
| } |
| |
| /** Create a Chunker from a given blob and chunkSize. */ |
| public static Chunker from(byte[] blob, int chunkSize) throws IOException { |
| return new Chunker(toItem(blob), chunkSize); |
| } |
| |
| /** Create a Chunker from a given blob. */ |
| public static Chunker from(byte[] blob) throws IOException { |
| return from(blob, getDefaultChunkSize()); |
| } |
| |
| /** Create a Chunker from a given Path and chunkSize. */ |
| public static Chunker from(Path file, int chunkSize) throws IOException { |
| return new Chunker(toItem(file), chunkSize); |
| } |
| |
| /** Create a Chunker from a given Path. */ |
| public static Chunker from(Path file) throws IOException { |
| return from(file, getDefaultChunkSize()); |
| } |
| |
| static class MemberOf implements Predicate<Item> { |
| private final Set<Digest> digests; |
| |
| public MemberOf(Set<Digest> digests) { |
| this.digests = digests; |
| } |
| |
| @Override |
| public boolean apply(Item item) { |
| try { |
| return digests.contains(item.getDigest()); |
| } catch (IOException e) { |
| throw new RuntimeException(e); |
| } |
| } |
| } |
| |
| /** |
| * Create a Chunker from multiple input sources. The order of the sources provided to the Builder |
| * will be the same order they will be chunked by. |
| */ |
| public static final class Builder { |
| private final ArrayList<Item> items = new ArrayList<>(); |
| private Set<Digest> digests = null; |
| private int chunkSize = getDefaultChunkSize(); |
| |
| public Chunker build() throws IOException { |
| return new Chunker( |
| digests == null |
| ? items.iterator() |
| : Iterators.filter(items.iterator(), new MemberOf(digests)), |
| chunkSize); |
| } |
| |
| public Builder chunkSize(int chunkSize) { |
| this.chunkSize = chunkSize; |
| return this; |
| } |
| |
| /** |
| * Restricts the Chunker to use only inputs with these digests. This is an optimization for CAS |
| * uploads where a list of digests missing from the CAS is known. |
| */ |
| public Builder onlyUseDigests(Set<Digest> digests) { |
| this.digests = digests; |
| return this; |
| } |
| |
| public Builder addInput(byte[] blob) { |
| items.add(toItem(blob)); |
| return this; |
| } |
| |
| public Builder addInput(Path file) { |
| items.add(toItem(file)); |
| return this; |
| } |
| |
| public Builder addInput(ActionInput input, ActionInputFileCache inputCache, Path execRoot) { |
| items.add(toItem(input, inputCache, execRoot)); |
| return this; |
| } |
| |
| public Builder addAllInputs( |
| Collection<? extends ActionInput> inputs, ActionInputFileCache inputCache, Path execRoot) { |
| for (ActionInput input : inputs) { |
| items.add(toItem(input, inputCache, execRoot)); |
| } |
| return this; |
| } |
| } |
| } |