| // Copyright 2017 The Bazel Authors. All rights reserved. |
| // |
| // Licensed under the Apache License, Version 2.0 (the "License"); |
| // you may not use this file except in compliance with the License. |
| // You may obtain a copy of the License at |
| // |
| // http://www.apache.org/licenses/LICENSE-2.0 |
| // |
| // Unless required by applicable law or agreed to in writing, software |
| // distributed under the License is distributed on an "AS IS" BASIS, |
| // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| // See the License for the specific language governing permissions and |
| // limitations under the License. |
| package com.google.devtools.build.lib.remote; |
| |
| import static com.google.common.base.Preconditions.checkArgument; |
| import static com.google.common.base.Preconditions.checkNotNull; |
| import static com.google.common.base.Preconditions.checkState; |
| import static java.lang.String.format; |
| import static java.util.Collections.singletonList; |
| import static java.util.concurrent.TimeUnit.SECONDS; |
| |
| import build.bazel.remote.execution.v2.Digest; |
| import com.google.bytestream.ByteStreamGrpc; |
| import com.google.bytestream.ByteStreamProto.WriteRequest; |
| import com.google.bytestream.ByteStreamProto.WriteResponse; |
| import com.google.common.annotations.VisibleForTesting; |
| import com.google.common.base.Strings; |
| import com.google.common.base.Throwables; |
| import com.google.common.hash.HashCode; |
| import com.google.common.util.concurrent.Futures; |
| import com.google.common.util.concurrent.ListenableFuture; |
| import com.google.common.util.concurrent.MoreExecutors; |
| import com.google.common.util.concurrent.SettableFuture; |
| import com.google.devtools.build.lib.remote.util.TracingMetadataUtils; |
| import io.grpc.CallCredentials; |
| import io.grpc.CallOptions; |
| import io.grpc.Channel; |
| import io.grpc.ClientCall; |
| import io.grpc.Context; |
| import io.grpc.Metadata; |
| import io.grpc.Status; |
| import io.grpc.StatusRuntimeException; |
| import io.netty.util.AbstractReferenceCounted; |
| import io.netty.util.ReferenceCounted; |
| import java.io.IOException; |
| import java.util.ArrayList; |
| import java.util.HashMap; |
| import java.util.HashSet; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.Set; |
| import java.util.UUID; |
| import java.util.concurrent.ExecutionException; |
| import java.util.concurrent.Future; |
| import java.util.logging.Level; |
| import java.util.logging.Logger; |
| import javax.annotation.Nullable; |
| import javax.annotation.concurrent.GuardedBy; |
| |
| /** |
| * A client implementing the {@code Write} method of the {@code ByteStream} gRPC service. |
| * |
| * <p>The uploader supports reference counting to easily be shared between components with |
| * different lifecyles. After instantiation the reference count is {@code 1}. |
| * |
| * See {@link ReferenceCounted} for more information on reference counting. |
| */ |
| class ByteStreamUploader extends AbstractReferenceCounted { |
| |
| private static final Logger logger = Logger.getLogger(ByteStreamUploader.class.getName()); |
| |
| private final String instanceName; |
| private final ReferenceCountedChannel channel; |
| private final CallCredentials callCredentials; |
| private final long callTimeoutSecs; |
| private final RemoteRetrier retrier; |
| |
| private final Object lock = new Object(); |
| |
| /** Contains the hash codes of already uploaded blobs. **/ |
| @GuardedBy("lock") |
| private final Set<HashCode> uploadedBlobs = new HashSet<>(); |
| |
| @GuardedBy("lock") |
| private final Map<Digest, ListenableFuture<Void>> uploadsInProgress = new HashMap<>(); |
| |
| @GuardedBy("lock") |
| private boolean isShutdown; |
| |
| /** |
| * Creates a new instance. |
| * |
| * @param instanceName the instance name to be prepended to resource name of the {@code Write} |
| * call. See the {@code ByteStream} service definition for details |
| * @param channel the {@link io.grpc.Channel} to use for calls |
| * @param callCredentials the credentials to use for authentication. May be {@code null}, in which |
| * case no authentication is performed |
| * @param callTimeoutSecs the timeout in seconds after which a {@code Write} gRPC call must be |
| * complete. The timeout resets between retries |
| * @param retrier the {@link RemoteRetrier} whose backoff strategy to use for retry timings. |
| */ |
| public ByteStreamUploader( |
| @Nullable String instanceName, |
| ReferenceCountedChannel channel, |
| @Nullable CallCredentials callCredentials, |
| long callTimeoutSecs, |
| RemoteRetrier retrier) { |
| checkArgument(callTimeoutSecs > 0, "callTimeoutSecs must be gt 0."); |
| |
| this.instanceName = instanceName; |
| this.channel = channel; |
| this.callCredentials = callCredentials; |
| this.callTimeoutSecs = callTimeoutSecs; |
| this.retrier = retrier; |
| } |
| |
| /** |
| * Uploads a BLOB, as provided by the {@link Chunker}, to the remote {@code ByteStream} service. |
| * The call blocks until the upload is complete, or throws an {@link Exception} in case of error. |
| * |
| * <p>Uploads are retried according to the specified {@link RemoteRetrier}. Retrying is |
| * transparent to the user of this API. |
| * |
| * <p>Trying to upload the same BLOB multiple times concurrently, results in only one upload being |
| * performed. This is transparent to the user of this API. |
| * |
| * @param chunker the data to upload. |
| * @param forceUpload if {@code false} the blob is not uploaded if it has previously been |
| * uploaded, if {@code true} the blob is uploaded. |
| * @throws IOException when reading of the {@link Chunker}s input source fails |
| */ |
| public void uploadBlob(Chunker chunker, boolean forceUpload) throws IOException, |
| InterruptedException { |
| uploadBlobs(singletonList(chunker), forceUpload); |
| } |
| |
| /** |
| * Uploads a list of BLOBs concurrently to the remote {@code ByteStream} service. The call blocks |
| * until the upload of all BLOBs is complete, or throws an {@link Exception} after the first |
| * upload failed. Any other uploads will continue uploading in the background, until they complete |
| * or the {@link #shutdown()} method is called. Errors encountered by these uploads are swallowed. |
| * |
| * <p>Uploads are retried according to the specified {@link RemoteRetrier}. Retrying is |
| * transparent to the user of this API. |
| * |
| * <p>Trying to upload the same BLOB multiple times concurrently, results in only one upload being |
| * performed. This is transparent to the user of this API. |
| * |
| * @param chunkers the data to upload. |
| * @param forceUpload if {@code false} the blob is not uploaded if it has previously been |
| * uploaded, if {@code true} the blob is uploaded. |
| * @throws IOException when reading of the {@link Chunker}s input source or uploading fails |
| */ |
| public void uploadBlobs(Iterable<Chunker> chunkers, boolean forceUpload) |
| throws IOException, InterruptedException { |
| List<ListenableFuture<Void>> uploads = new ArrayList<>(); |
| |
| for (Chunker chunker : chunkers) { |
| uploads.add(uploadBlobAsync(chunker, forceUpload)); |
| } |
| |
| try { |
| for (ListenableFuture<Void> upload : uploads) { |
| upload.get(); |
| } |
| } catch (ExecutionException e) { |
| Throwable cause = e.getCause(); |
| Throwables.propagateIfInstanceOf(cause, IOException.class); |
| Throwables.propagateIfInstanceOf(cause, InterruptedException.class); |
| if (cause instanceof StatusRuntimeException) { |
| throw new IOException(cause); |
| } |
| Throwables.propagate(cause); |
| } |
| } |
| |
| /** |
| * Cancels all running uploads. The method returns immediately and does NOT wait for the uploads |
| * to be cancelled. |
| * |
| * <p>This method should not be called directly, but will be called implicitly when the |
| * reference count reaches {@code 0}. |
| */ |
| @VisibleForTesting |
| void shutdown() { |
| synchronized (lock) { |
| if (isShutdown) { |
| return; |
| } |
| isShutdown = true; |
| // Before cancelling, copy the futures to a separate list in order to avoid concurrently |
| // iterating over and modifying the map (cancel triggers a listener that removes the entry |
| // from the map. the listener is executed in the same thread.). |
| List<Future<Void>> uploadsToCancel = new ArrayList<>(uploadsInProgress.values()); |
| for (Future<Void> upload : uploadsToCancel) { |
| upload.cancel(true); |
| } |
| } |
| } |
| |
| /** |
| * Uploads a BLOB asynchronously to the remote {@code ByteStream} service. The call returns |
| * immediately and one can listen to the returned future for the success/failure of the upload. |
| * |
| * <p>Uploads are retried according to the specified {@link RemoteRetrier}. Retrying is |
| * transparent to the user of this API. |
| * |
| * <p>Trying to upload the same BLOB multiple times concurrently, results in only one upload being |
| * performed. This is transparent to the user of this API. |
| * |
| * @param chunker the data to upload. |
| * @param forceUpload if {@code false} the blob is not uploaded if it has previously been |
| * uploaded, if {@code true} the blob is uploaded. |
| * @throws IOException when reading of the {@link Chunker}s input source fails |
| */ |
| public ListenableFuture<Void> uploadBlobAsync(Chunker chunker, boolean forceUpload) { |
| Digest digest = checkNotNull(chunker.digest()); |
| HashCode hash = HashCode.fromString(digest.getHash()); |
| |
| synchronized (lock) { |
| checkState(!isShutdown, "Must not call uploadBlobs after shutdown."); |
| |
| if (!forceUpload && uploadedBlobs.contains(hash)) { |
| return Futures.immediateFuture(null); |
| } |
| |
| ListenableFuture<Void> inProgress = uploadsInProgress.get(digest); |
| if (inProgress != null) { |
| return inProgress; |
| } |
| |
| Context ctx = Context.current(); |
| ListenableFuture<Void> uploadResult = |
| Futures.transform( |
| retrier.executeAsync(() -> ctx.call(() -> startAsyncUpload(chunker))), |
| (v) -> { |
| synchronized (lock) { |
| uploadedBlobs.add(hash); |
| } |
| return null; |
| }, |
| MoreExecutors.directExecutor()); |
| uploadsInProgress.put(digest, uploadResult); |
| uploadResult.addListener( |
| () -> { |
| synchronized (lock) { |
| uploadsInProgress.remove(digest); |
| } |
| }, |
| MoreExecutors.directExecutor()); |
| return uploadResult; |
| } |
| } |
| |
| @VisibleForTesting |
| boolean uploadsInProgress() { |
| synchronized (lock) { |
| return !uploadsInProgress.isEmpty(); |
| } |
| } |
| |
| /** Starts a file upload an returns a future representing the upload. */ |
| private ListenableFuture<Void> startAsyncUpload(Chunker chunker) { |
| try { |
| chunker.reset(); |
| } catch (IOException e) { |
| return Futures.immediateFailedFuture(e); |
| } |
| |
| SettableFuture<Void> currUpload = SettableFuture.create(); |
| AsyncUpload newUpload = |
| new AsyncUpload( |
| channel, callCredentials, callTimeoutSecs, instanceName, chunker, currUpload); |
| currUpload.addListener( |
| () -> { |
| if (currUpload.isCancelled()) { |
| newUpload.cancel(); |
| } |
| }, |
| MoreExecutors.directExecutor()); |
| newUpload.start(); |
| return currUpload; |
| } |
| |
| @Override |
| public ByteStreamUploader retain() { |
| return (ByteStreamUploader) super.retain(); |
| } |
| |
| @Override |
| public ByteStreamUploader retain(int increment) { |
| return (ByteStreamUploader) super.retain(increment); |
| } |
| |
| @Override |
| protected void deallocate() { |
| shutdown(); |
| channel.release(); |
| } |
| |
| @Override |
| public ReferenceCounted touch(Object o) { |
| return this; |
| } |
| |
| private static class AsyncUpload { |
| |
| private final Channel channel; |
| private final CallCredentials callCredentials; |
| private final long callTimeoutSecs; |
| private final String instanceName; |
| private final Chunker chunker; |
| private final SettableFuture<Void> uploadResult; |
| |
| private ClientCall<WriteRequest, WriteResponse> call; |
| |
| AsyncUpload( |
| Channel channel, |
| CallCredentials callCredentials, |
| long callTimeoutSecs, |
| String instanceName, |
| Chunker chunker, |
| SettableFuture<Void> uploadResult) { |
| this.channel = channel; |
| this.callCredentials = callCredentials; |
| this.callTimeoutSecs = callTimeoutSecs; |
| this.instanceName = instanceName; |
| this.chunker = chunker; |
| this.uploadResult = uploadResult; |
| } |
| |
| void start() { |
| CallOptions callOptions = |
| CallOptions.DEFAULT |
| .withCallCredentials(callCredentials) |
| .withDeadlineAfter(callTimeoutSecs, SECONDS); |
| call = channel.newCall(ByteStreamGrpc.getWriteMethod(), callOptions); |
| |
| ClientCall.Listener<WriteResponse> callListener = |
| new ClientCall.Listener<WriteResponse>() { |
| |
| private final WriteRequest.Builder requestBuilder = WriteRequest.newBuilder(); |
| private boolean callHalfClosed = false; |
| |
| @Override |
| public void onMessage(WriteResponse response) { |
| // TODO(buchgr): The ByteStream API allows to resume the upload at the committedSize. |
| } |
| |
| @Override |
| public void onClose(Status status, Metadata trailers) { |
| if (status.isOk()) { |
| uploadResult.set(null); |
| } else { |
| uploadResult.setException(status.asRuntimeException()); |
| } |
| } |
| |
| @Override |
| public void onReady() { |
| while (call.isReady()) { |
| if (!chunker.hasNext()) { |
| // call.halfClose() may only be called once. Guard against it being called more |
| // often. |
| // See: https://github.com/grpc/grpc-java/issues/3201 |
| if (!callHalfClosed) { |
| callHalfClosed = true; |
| // Every chunk has been written. No more work to do. |
| call.halfClose(); |
| } |
| return; |
| } |
| |
| try { |
| requestBuilder.clear(); |
| Chunker.Chunk chunk = chunker.next(); |
| |
| if (chunk.getOffset() == 0) { |
| // Resource name only needs to be set on the first write for each file. |
| requestBuilder.setResourceName(newResourceName(chunk.getDigest())); |
| } |
| |
| boolean isLastChunk = !chunker.hasNext(); |
| WriteRequest request = |
| requestBuilder |
| .setData(chunk.getData()) |
| .setWriteOffset(chunk.getOffset()) |
| .setFinishWrite(isLastChunk) |
| .build(); |
| |
| call.sendMessage(request); |
| } catch (IOException e) { |
| try { |
| chunker.reset(); |
| } catch (IOException e1) { |
| // This exception indicates that closing the underlying input stream failed. |
| // We don't expect this to ever happen, but don't want to swallow the exception |
| // completely. |
| logger.log(Level.WARNING, "Chunker failed closing data source.", e1); |
| } finally { |
| call.cancel("Failed to read next chunk.", e); |
| } |
| } |
| } |
| } |
| |
| private String newResourceName(Digest digest) { |
| String resourceName = |
| format( |
| "uploads/%s/blobs/%s/%d", |
| UUID.randomUUID(), digest.getHash(), digest.getSizeBytes()); |
| if (!Strings.isNullOrEmpty(instanceName)) { |
| resourceName = instanceName + "/" + resourceName; |
| } |
| return resourceName; |
| } |
| }; |
| call.start(callListener, TracingMetadataUtils.headersFromCurrentContext()); |
| call.request(1); |
| } |
| |
| void cancel() { |
| if (call != null) { |
| call.cancel("Cancelled by user.", null); |
| } |
| } |
| } |
| } |