blob: e393d70744a073de7f6b8c8d6c882c87e8075655 [file] [log] [blame]
// Copyright 2016 The Bazel Authors. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package com.google.devtools.build.lib.remote;
import static com.google.common.base.Preconditions.checkNotNull;
import static com.google.common.base.Preconditions.checkState;
import build.bazel.remote.execution.v2.Digest;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.base.Throwables;
import com.google.common.io.ByteStreams;
import com.google.devtools.build.lib.actions.ActionInput;
import com.google.devtools.build.lib.actions.ActionInputHelper;
import com.google.devtools.build.lib.actions.cache.VirtualActionInput;
import com.google.devtools.build.lib.remote.util.DigestUtil;
import com.google.devtools.build.lib.vfs.Path;
import com.google.protobuf.ByteString;
import java.io.ByteArrayInputStream;
import java.io.EOFException;
import java.io.IOException;
import java.io.InputStream;
import java.util.NoSuchElementException;
import java.util.Objects;
import java.util.function.Supplier;
/**
* Splits a data source into one or more {@link Chunk}s of at most {@code chunkSize} bytes.
*
* <p>After a data source has been fully consumed, that is until {@link #hasNext()} returns
* {@code false}, the chunker closes the underlying data source (i.e. file) itself. However, in
* case of error or when a data source does not get fully consumed, a user must call
* {@link #reset()} manually.
*/
public final class Chunker {
private static int defaultChunkSize = 1024 * 16;
/** This method must only be called in tests! */
@VisibleForTesting
static void setDefaultChunkSizeForTesting(int value) {
defaultChunkSize = value;
}
static int getDefaultChunkSize() {
return defaultChunkSize;
}
/** A piece of a byte[] blob. */
public static final class Chunk {
private final Digest digest;
private final long offset;
private final ByteString data;
private Chunk(Digest digest, ByteString data, long offset) {
this.digest = digest;
this.data = data;
this.offset = offset;
}
public Digest getDigest() {
return digest;
}
public long getOffset() {
return offset;
}
public ByteString getData() {
return data;
}
@Override
public boolean equals(Object o) {
if (o == this) {
return true;
}
if (!(o instanceof Chunk)) {
return false;
}
Chunk other = (Chunk) o;
return other.offset == offset
&& other.digest.equals(digest)
&& other.data.equals(data);
}
@Override
public int hashCode() {
return Objects.hash(digest, offset, data);
}
}
private final Supplier<InputStream> dataSupplier;
private final Digest digest;
private final int chunkSize;
private final Chunk emptyChunk;
private InputStream data;
private long offset;
private byte[] chunkCache;
// Set to true on the first call to next(). This is so that the Chunker can open its data source
// lazily on the first call to next(), as opposed to opening it in the constructor or on reset().
private boolean initialized;
Chunker(Supplier<InputStream> dataSupplier, Digest digest, int chunkSize, DigestUtil digestUtil) {
this.dataSupplier = checkNotNull(dataSupplier);
this.digest = checkNotNull(digest);
this.chunkSize = chunkSize;
this.emptyChunk = new Chunk(digestUtil.compute(new byte[0]), ByteString.EMPTY, 0);
}
public Digest digest() {
return digest;
}
/**
* Reset the {@link Chunker} state to when it was newly constructed.
*
* <p>Closes any open resources (file handles, ...).
*/
public void reset() throws IOException {
if (data != null) {
data.close();
}
data = null;
offset = 0;
initialized = false;
chunkCache = null;
}
/**
* Returns {@code true} if a subsequent call to {@link #next()} returns a {@link Chunk} object;
*/
public boolean hasNext() {
return data != null || !initialized;
}
/**
* Returns the next {@link Chunk} or throws a {@link NoSuchElementException} if no data is left.
*
* <p>Always call {@link #hasNext()} before calling this method.
*
* <p>Zero byte inputs are treated special. Instead of throwing a {@link NoSuchElementException}
* on the first call to {@link #next()}, a {@link Chunk} with an empty {@link ByteString} is
* returned.
*/
public Chunk next() throws IOException {
if (!hasNext()) {
throw new NoSuchElementException();
}
maybeInitialize();
if (digest.getSizeBytes() == 0) {
data = null;
return emptyChunk;
}
// The cast to int is safe, because the return value is capped at chunkSize.
int bytesToRead = (int) Math.min(bytesLeft(), chunkSize);
if (bytesToRead == 0) {
chunkCache = null;
data = null;
throw new NoSuchElementException();
}
if (chunkCache == null) {
// Lazily allocate it in order to save memory on small data.
// 1) bytesToRead < chunkSize: There will only ever be one next() call.
// 2) bytesToRead == chunkSize: chunkCache will be set to its biggest possible value.
// 3) bytestoRead > chunkSize: Not possible, due to Math.min above.
chunkCache = new byte[bytesToRead];
}
long offsetBefore = offset;
try {
ByteStreams.readFully(data, chunkCache, 0, bytesToRead);
} catch (EOFException e) {
throw new IllegalStateException("Reached EOF, but expected "
+ bytesToRead + " bytes.", e);
}
offset += bytesToRead;
ByteString blob = ByteString.copyFrom(chunkCache, 0, bytesToRead);
if (bytesLeft() == 0) {
data.close();
data = null;
chunkCache = null;
}
return new Chunk(digest, blob, offsetBefore);
}
private long bytesLeft() {
return digest.getSizeBytes() - offset;
}
private void maybeInitialize() throws IOException {
if (initialized) {
return;
}
checkState(data == null);
checkState(offset == 0);
checkState(chunkCache == null);
try {
data = dataSupplier.get();
} catch (RuntimeException e) {
Throwables.propagateIfPossible(e.getCause(), IOException.class);
throw e;
}
initialized = true;
}
public static Builder builder(DigestUtil digestUtil) {
return new Builder(digestUtil);
}
/** Builder class for the Chunker */
public static class Builder {
private final DigestUtil digestUtil;
private int chunkSize = getDefaultChunkSize();
private Digest digest;
private Supplier<InputStream> inputStream;
Builder(DigestUtil digestUtil) {
this.digestUtil = digestUtil;
}
public Builder setInput(byte[] data) {
Preconditions.checkState(inputStream == null);
digest = digestUtil.compute(data);
inputStream = () -> new ByteArrayInputStream(data);
return this;
}
public Builder setInput(Digest digest, byte[] data) {
Preconditions.checkState(inputStream == null);
this.digest = digest;
inputStream = () -> new ByteArrayInputStream(data);
return this;
}
public Builder setInput(Digest digest, Path file) {
Preconditions.checkState(inputStream == null);
this.digest = digest;
inputStream =
() -> {
try {
return file.getInputStream();
} catch (IOException e) {
throw new RuntimeException(e);
}
};
return this;
}
public Builder setInput(Digest digest, ActionInput actionInput, Path execRoot) {
Preconditions.checkState(inputStream == null);
this.digest = digest;
if (actionInput instanceof VirtualActionInput) {
this.inputStream =
() -> {
try {
return ((VirtualActionInput) actionInput).getBytes().newInput();
} catch (IOException e) {
throw new RuntimeException(e);
}
};
} else {
inputStream =
() -> {
try {
return ActionInputHelper.toInputPath(actionInput, execRoot).getInputStream();
} catch (IOException e) {
throw new RuntimeException(e);
}
};
}
return this;
}
public Builder setChunkSize(int chunkSize) {
this.chunkSize = chunkSize;
return this;
}
public Chunker build() {
Preconditions.checkNotNull(inputStream, digest);
return new Chunker(inputStream, digest, chunkSize, digestUtil);
}
}
}