| // Copyright 2017 The Bazel Authors. All rights reserved. |
| // |
| // Licensed under the Apache License, Version 2.0 (the "License"); |
| // you may not use this file except in compliance with the License. |
| // You may obtain a copy of the License at |
| // |
| // http://www.apache.org/licenses/LICENSE-2.0 |
| // |
| // Unless required by applicable law or agreed to in writing, software |
| // distributed under the License is distributed on an "AS IS" BASIS, |
| // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| // See the License for the specific language governing permissions and |
| // limitations under the License. |
| package com.google.devtools.build.lib.remote; |
| |
| import static com.google.common.base.Preconditions.checkArgument; |
| import static com.google.common.base.Preconditions.checkState; |
| import static java.lang.String.format; |
| import static java.util.Collections.singletonMap; |
| import static java.util.concurrent.TimeUnit.SECONDS; |
| |
| import com.google.bytestream.ByteStreamGrpc; |
| import com.google.bytestream.ByteStreamGrpc.ByteStreamFutureStub; |
| import com.google.bytestream.ByteStreamProto.QueryWriteStatusRequest; |
| import com.google.bytestream.ByteStreamProto.WriteRequest; |
| import com.google.bytestream.ByteStreamProto.WriteResponse; |
| import com.google.common.annotations.VisibleForTesting; |
| import com.google.common.base.Strings; |
| import com.google.common.base.Throwables; |
| import com.google.common.hash.HashCode; |
| import com.google.common.util.concurrent.Futures; |
| import com.google.common.util.concurrent.ListenableFuture; |
| import com.google.common.util.concurrent.MoreExecutors; |
| import com.google.common.util.concurrent.SettableFuture; |
| import com.google.devtools.build.lib.remote.RemoteRetrier.ProgressiveBackoff; |
| import com.google.devtools.build.lib.remote.util.TracingMetadataUtils; |
| import io.grpc.CallCredentials; |
| import io.grpc.CallOptions; |
| import io.grpc.Channel; |
| import io.grpc.ClientCall; |
| import io.grpc.Context; |
| import io.grpc.Metadata; |
| import io.grpc.Status; |
| import io.grpc.Status.Code; |
| import io.grpc.StatusRuntimeException; |
| import io.netty.util.AbstractReferenceCounted; |
| import io.netty.util.ReferenceCounted; |
| import java.io.IOException; |
| import java.util.ArrayList; |
| import java.util.HashMap; |
| import java.util.HashSet; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.Set; |
| import java.util.UUID; |
| import java.util.concurrent.CancellationException; |
| import java.util.concurrent.ExecutionException; |
| import java.util.concurrent.Future; |
| import java.util.concurrent.atomic.AtomicLong; |
| import java.util.logging.Level; |
| import java.util.logging.Logger; |
| import javax.annotation.Nullable; |
| import javax.annotation.concurrent.GuardedBy; |
| |
| /** |
| * A client implementing the {@code Write} method of the {@code ByteStream} gRPC service. |
| * |
| * <p>The uploader supports reference counting to easily be shared between components with |
| * different lifecyles. After instantiation the reference count is {@code 1}. |
| * |
| * See {@link ReferenceCounted} for more information on reference counting. |
| */ |
| class ByteStreamUploader extends AbstractReferenceCounted { |
| |
| private static final Logger logger = Logger.getLogger(ByteStreamUploader.class.getName()); |
| |
| private final String instanceName; |
| private final ReferenceCountedChannel channel; |
| private final CallCredentials callCredentials; |
| private final long callTimeoutSecs; |
| private final RemoteRetrier retrier; |
| |
| private final Object lock = new Object(); |
| |
| /** Contains the hash codes of already uploaded blobs. **/ |
| @GuardedBy("lock") |
| private final Set<HashCode> uploadedBlobs = new HashSet<>(); |
| |
| @GuardedBy("lock") |
| private final Map<HashCode, ListenableFuture<Void>> uploadsInProgress = new HashMap<>(); |
| |
| @GuardedBy("lock") |
| private boolean isShutdown; |
| |
| /** |
| * Creates a new instance. |
| * |
| * @param instanceName the instance name to be prepended to resource name of the {@code Write} |
| * call. See the {@code ByteStream} service definition for details |
| * @param channel the {@link io.grpc.Channel} to use for calls |
| * @param callCredentials the credentials to use for authentication. May be {@code null}, in which |
| * case no authentication is performed |
| * @param callTimeoutSecs the timeout in seconds after which a {@code Write} gRPC call must be |
| * complete. The timeout resets between retries |
| * @param retrier the {@link RemoteRetrier} whose backoff strategy to use for retry timings. |
| */ |
| public ByteStreamUploader( |
| @Nullable String instanceName, |
| ReferenceCountedChannel channel, |
| @Nullable CallCredentials callCredentials, |
| long callTimeoutSecs, |
| RemoteRetrier retrier) { |
| checkArgument(callTimeoutSecs > 0, "callTimeoutSecs must be gt 0."); |
| |
| this.instanceName = instanceName; |
| this.channel = channel; |
| this.callCredentials = callCredentials; |
| this.callTimeoutSecs = callTimeoutSecs; |
| this.retrier = retrier; |
| } |
| |
| /** |
| * Uploads a BLOB, as provided by the {@link Chunker}, to the remote {@code ByteStream} service. |
| * The call blocks until the upload is complete, or throws an {@link Exception} in case of error. |
| * |
| * <p>Uploads are retried according to the specified {@link RemoteRetrier}. Retrying is |
| * transparent to the user of this API. |
| * |
| * <p>Trying to upload the same BLOB multiple times concurrently, results in only one upload being |
| * performed. This is transparent to the user of this API. |
| * |
| * @param hash the hash of the data to upload. |
| * @param chunker the data to upload. |
| * @param forceUpload if {@code false} the blob is not uploaded if it has previously been |
| * uploaded, if {@code true} the blob is uploaded. |
| * @throws IOException when reading of the {@link Chunker}s input source fails |
| */ |
| public void uploadBlob(HashCode hash, Chunker chunker, boolean forceUpload) |
| throws IOException, InterruptedException { |
| uploadBlobs(singletonMap(hash, chunker), forceUpload); |
| } |
| |
| /** |
| * Uploads a list of BLOBs concurrently to the remote {@code ByteStream} service. The call blocks |
| * until the upload of all BLOBs is complete, or throws an {@link Exception} after the first |
| * upload failed. Any other uploads will continue uploading in the background, until they complete |
| * or the {@link #shutdown()} method is called. Errors encountered by these uploads are swallowed. |
| * |
| * <p>Uploads are retried according to the specified {@link RemoteRetrier}. Retrying is |
| * transparent to the user of this API. |
| * |
| * <p>Trying to upload the same BLOB multiple times concurrently, results in only one upload being |
| * performed. This is transparent to the user of this API. |
| * |
| * @param chunkers the data to upload. |
| * @param forceUpload if {@code false} the blob is not uploaded if it has previously been |
| * uploaded, if {@code true} the blob is uploaded. |
| * @throws IOException when reading of the {@link Chunker}s input source or uploading fails |
| */ |
| public void uploadBlobs(Map<HashCode, Chunker> chunkers, boolean forceUpload) |
| throws IOException, InterruptedException { |
| List<ListenableFuture<Void>> uploads = new ArrayList<>(); |
| |
| for (Map.Entry<HashCode, Chunker> chunkerEntry : chunkers.entrySet()) { |
| uploads.add(uploadBlobAsync(chunkerEntry.getKey(), chunkerEntry.getValue(), forceUpload)); |
| } |
| |
| try { |
| for (ListenableFuture<Void> upload : uploads) { |
| upload.get(); |
| } |
| } catch (ExecutionException e) { |
| Throwable cause = e.getCause(); |
| Throwables.propagateIfInstanceOf(cause, IOException.class); |
| Throwables.propagateIfInstanceOf(cause, InterruptedException.class); |
| if (cause instanceof StatusRuntimeException) { |
| throw new IOException(cause); |
| } |
| Throwables.propagate(cause); |
| } |
| } |
| |
| /** |
| * Cancels all running uploads. The method returns immediately and does NOT wait for the uploads |
| * to be cancelled. |
| * |
| * <p>This method should not be called directly, but will be called implicitly when the |
| * reference count reaches {@code 0}. |
| */ |
| @VisibleForTesting |
| void shutdown() { |
| synchronized (lock) { |
| if (isShutdown) { |
| return; |
| } |
| isShutdown = true; |
| // Before cancelling, copy the futures to a separate list in order to avoid concurrently |
| // iterating over and modifying the map (cancel triggers a listener that removes the entry |
| // from the map. the listener is executed in the same thread.). |
| List<Future<Void>> uploadsToCancel = new ArrayList<>(uploadsInProgress.values()); |
| for (Future<Void> upload : uploadsToCancel) { |
| upload.cancel(true); |
| } |
| } |
| } |
| |
| /** |
| * Uploads a BLOB asynchronously to the remote {@code ByteStream} service. The call returns |
| * immediately and one can listen to the returned future for the success/failure of the upload. |
| * |
| * <p>Uploads are retried according to the specified {@link RemoteRetrier}. Retrying is |
| * transparent to the user of this API. |
| * |
| * <p>Trying to upload the same BLOB multiple times concurrently, results in only one upload being |
| * performed. This is transparent to the user of this API. |
| * |
| * @param hash the hash of the data to upload. |
| * @param chunker the data to upload. |
| * @param forceUpload if {@code false} the blob is not uploaded if it has previously been |
| * uploaded, if {@code true} the blob is uploaded. |
| * @throws IOException when reading of the {@link Chunker}s input source fails |
| */ |
| public ListenableFuture<Void> uploadBlobAsync( |
| HashCode hash, Chunker chunker, boolean forceUpload) { |
| synchronized (lock) { |
| checkState(!isShutdown, "Must not call uploadBlobs after shutdown."); |
| |
| if (!forceUpload && uploadedBlobs.contains(hash)) { |
| return Futures.immediateFuture(null); |
| } |
| |
| ListenableFuture<Void> inProgress = uploadsInProgress.get(hash); |
| if (inProgress != null) { |
| return inProgress; |
| } |
| |
| ListenableFuture<Void> uploadResult = |
| Futures.transform( |
| startAsyncUpload(hash, chunker), |
| (v) -> { |
| synchronized (lock) { |
| uploadedBlobs.add(hash); |
| } |
| return null; |
| }, |
| MoreExecutors.directExecutor()); |
| // A future that only completes once the upload and internal state updates have |
| // been completed. |
| SettableFuture<Void> uploadAndBookkeepingComplete = SettableFuture.create(); |
| uploadsInProgress.put(hash, uploadAndBookkeepingComplete); |
| uploadAndBookkeepingComplete.addListener( |
| () -> { |
| if (uploadAndBookkeepingComplete.isCancelled()) { |
| uploadResult.cancel(true); |
| } |
| }, |
| MoreExecutors.directExecutor()); |
| uploadResult.addListener( |
| () -> { |
| synchronized (lock) { |
| uploadsInProgress.remove(hash); |
| try { |
| uploadResult.get(); |
| uploadAndBookkeepingComplete.set(null); |
| } catch (ExecutionException e) { |
| uploadAndBookkeepingComplete.setException(e.getCause()); |
| } catch (CancellationException e) { |
| uploadAndBookkeepingComplete.cancel(true); |
| } catch (Throwable e) { |
| uploadAndBookkeepingComplete.setException(e); |
| } |
| } |
| }, |
| MoreExecutors.directExecutor()); |
| return uploadAndBookkeepingComplete; |
| } |
| } |
| |
| @VisibleForTesting |
| boolean uploadsInProgress() { |
| synchronized (lock) { |
| return !uploadsInProgress.isEmpty(); |
| } |
| } |
| |
| private static String uploadResourceName( |
| String instanceName, UUID uuid, HashCode hash, long size) { |
| String resourceName = format("uploads/%s/blobs/%s/%d", uuid, hash, size); |
| if (!Strings.isNullOrEmpty(instanceName)) { |
| resourceName = instanceName + "/" + resourceName; |
| } |
| return resourceName; |
| } |
| |
| /** Starts a file upload an returns a future representing the upload. */ |
| private ListenableFuture<Void> startAsyncUpload(HashCode hash, Chunker chunker) { |
| try { |
| chunker.reset(); |
| } catch (IOException e) { |
| return Futures.immediateFailedFuture(e); |
| } |
| |
| UUID uploadId = UUID.randomUUID(); |
| String resourceName = uploadResourceName(instanceName, uploadId, hash, chunker.getSize()); |
| AsyncUpload newUpload = |
| new AsyncUpload(channel, callCredentials, callTimeoutSecs, retrier, resourceName, chunker); |
| ListenableFuture<Void> currUpload = newUpload.start(); |
| currUpload.addListener( |
| () -> { |
| if (currUpload.isCancelled()) { |
| newUpload.cancel(); |
| } |
| }, |
| MoreExecutors.directExecutor()); |
| return currUpload; |
| } |
| |
| @Override |
| public ByteStreamUploader retain() { |
| return (ByteStreamUploader) super.retain(); |
| } |
| |
| @Override |
| public ByteStreamUploader retain(int increment) { |
| return (ByteStreamUploader) super.retain(increment); |
| } |
| |
| @Override |
| protected void deallocate() { |
| shutdown(); |
| channel.release(); |
| } |
| |
| @Override |
| public ReferenceCounted touch(Object o) { |
| return this; |
| } |
| |
| private static class AsyncUpload { |
| |
| private final Channel channel; |
| private final CallCredentials callCredentials; |
| private final long callTimeoutSecs; |
| private final Retrier retrier; |
| private final String resourceName; |
| private final Chunker chunker; |
| |
| private ClientCall<WriteRequest, WriteResponse> call; |
| |
| AsyncUpload( |
| Channel channel, |
| CallCredentials callCredentials, |
| long callTimeoutSecs, |
| Retrier retrier, |
| String resourceName, |
| Chunker chunker) { |
| this.channel = channel; |
| this.callCredentials = callCredentials; |
| this.callTimeoutSecs = callTimeoutSecs; |
| this.retrier = retrier; |
| this.resourceName = resourceName; |
| this.chunker = chunker; |
| } |
| |
| ListenableFuture<Void> start() { |
| Context ctx = Context.current(); |
| ProgressiveBackoff progressiveBackoff = new ProgressiveBackoff(retrier::newBackoff); |
| AtomicLong committedOffset = new AtomicLong(0); |
| return Futures.transformAsync( |
| retrier.executeAsync( |
| () -> ctx.call(() -> callAndQueryOnFailure(committedOffset, progressiveBackoff)), |
| progressiveBackoff), |
| (result) -> { |
| long committedSize = committedOffset.get(); |
| long expected = chunker.getSize(); |
| if (committedSize != expected) { |
| String message = |
| format( |
| "write incomplete: committed_size %d for %d total", committedSize, expected); |
| return Futures.immediateFailedFuture(new IOException(message)); |
| } |
| return Futures.immediateFuture(null); |
| }, |
| MoreExecutors.directExecutor()); |
| } |
| |
| private ByteStreamFutureStub bsFutureStub() { |
| return ByteStreamGrpc.newFutureStub(channel) |
| .withInterceptors(TracingMetadataUtils.attachMetadataFromContextInterceptor()) |
| .withCallCredentials(callCredentials) |
| .withDeadlineAfter(callTimeoutSecs, SECONDS); |
| } |
| |
| private ListenableFuture<Void> callAndQueryOnFailure( |
| AtomicLong committedOffset, ProgressiveBackoff progressiveBackoff) { |
| return Futures.catchingAsync( |
| call(committedOffset), |
| Exception.class, |
| (e) -> guardQueryWithSuppression(e, committedOffset, progressiveBackoff), |
| Context.current().fixedContextExecutor(MoreExecutors.directExecutor())); |
| } |
| |
| private ListenableFuture<Void> guardQueryWithSuppression( |
| Exception e, AtomicLong committedOffset, ProgressiveBackoff progressiveBackoff) { |
| // we are destined to return this, avoid recreating it |
| ListenableFuture<Void> exceptionFuture = Futures.immediateFailedFuture(e); |
| |
| // TODO(buchgr): we should also return immediately without the query if |
| // we were out of retry attempts for the underlying backoff. This |
| // is meant to be an only in-between-retries query request. |
| if (!retrier.isRetriable(e)) { |
| return exceptionFuture; |
| } |
| |
| ListenableFuture<Void> suppressedQueryFuture = |
| Futures.catchingAsync( |
| query(committedOffset, progressiveBackoff), |
| Exception.class, |
| (queryException) -> { |
| // if the query threw an exception, add it to the suppressions |
| // for the destined exception |
| e.addSuppressed(queryException); |
| return exceptionFuture; |
| }, |
| MoreExecutors.directExecutor()); |
| return Futures.transformAsync( |
| suppressedQueryFuture, (result) -> exceptionFuture, MoreExecutors.directExecutor()); |
| } |
| |
| private ListenableFuture<Void> query( |
| AtomicLong committedOffset, ProgressiveBackoff progressiveBackoff) { |
| ListenableFuture<Long> committedSizeFuture = |
| Futures.transform( |
| bsFutureStub() |
| .queryWriteStatus( |
| QueryWriteStatusRequest.newBuilder().setResourceName(resourceName).build()), |
| (response) -> response.getCommittedSize(), |
| MoreExecutors.directExecutor()); |
| ListenableFuture<Long> guardedCommittedSizeFuture = |
| Futures.catchingAsync( |
| committedSizeFuture, |
| Exception.class, |
| (e) -> { |
| Status status = Status.fromThrowable(e); |
| if (status.getCode() == Code.UNIMPLEMENTED) { |
| // if the bytestream server does not implement the query, insist |
| // that we should reset the upload |
| return Futures.immediateFuture(0L); |
| } |
| return Futures.immediateFailedFuture(e); |
| }, |
| MoreExecutors.directExecutor()); |
| return Futures.transformAsync( |
| guardedCommittedSizeFuture, |
| (committedSize) -> { |
| if (committedSize > committedOffset.get()) { |
| // we have made progress on this upload in the last request, |
| // reset the backoff so that this request has a full deck of retries |
| progressiveBackoff.reset(); |
| } |
| committedOffset.set(committedSize); |
| return Futures.immediateFuture(null); |
| }, |
| MoreExecutors.directExecutor()); |
| } |
| |
| private ListenableFuture<Void> call(AtomicLong committedOffset) { |
| CallOptions callOptions = |
| CallOptions.DEFAULT |
| .withCallCredentials(callCredentials) |
| .withDeadlineAfter(callTimeoutSecs, SECONDS); |
| call = channel.newCall(ByteStreamGrpc.getWriteMethod(), callOptions); |
| |
| try { |
| chunker.seek(committedOffset.get()); |
| } catch (IOException e) { |
| try { |
| chunker.reset(); |
| } catch (IOException resetException) { |
| e.addSuppressed(resetException); |
| } |
| return Futures.immediateFailedFuture(e); |
| } |
| |
| SettableFuture<Void> uploadResult = SettableFuture.create(); |
| ClientCall.Listener<WriteResponse> callListener = |
| new ClientCall.Listener<WriteResponse>() { |
| |
| private final WriteRequest.Builder requestBuilder = WriteRequest.newBuilder(); |
| private boolean callHalfClosed = false; |
| |
| void halfClose() { |
| // call.halfClose() may only be called once. Guard against it being called more |
| // often. |
| // See: https://github.com/grpc/grpc-java/issues/3201 |
| if (!callHalfClosed) { |
| callHalfClosed = true; |
| // Every chunk has been written. No more work to do. |
| call.halfClose(); |
| } |
| } |
| |
| @Override |
| public void onMessage(WriteResponse response) { |
| // upload was completed either by us or someone else |
| committedOffset.set(response.getCommittedSize()); |
| halfClose(); |
| } |
| |
| @Override |
| public void onClose(Status status, Metadata trailers) { |
| if (status.isOk()) { |
| uploadResult.set(null); |
| } else { |
| uploadResult.setException(status.asRuntimeException()); |
| } |
| } |
| |
| @Override |
| public void onReady() { |
| while (call.isReady()) { |
| if (!chunker.hasNext()) { |
| halfClose(); |
| return; |
| } |
| |
| if (callHalfClosed) { |
| return; |
| } |
| |
| try { |
| requestBuilder.clear(); |
| Chunker.Chunk chunk = chunker.next(); |
| |
| if (chunk.getOffset() == committedOffset.get()) { |
| // Resource name only needs to be set on the first write for each file. |
| requestBuilder.setResourceName(resourceName); |
| } |
| |
| boolean isLastChunk = !chunker.hasNext(); |
| WriteRequest request = |
| requestBuilder |
| .setData(chunk.getData()) |
| .setWriteOffset(chunk.getOffset()) |
| .setFinishWrite(isLastChunk) |
| .build(); |
| |
| call.sendMessage(request); |
| } catch (IOException e) { |
| try { |
| chunker.reset(); |
| } catch (IOException e1) { |
| // This exception indicates that closing the underlying input stream failed. |
| // We don't expect this to ever happen, but don't want to swallow the exception |
| // completely. |
| logger.log(Level.WARNING, "Chunker failed closing data source.", e1); |
| } finally { |
| call.cancel("Failed to read next chunk.", e); |
| } |
| } |
| } |
| } |
| }; |
| call.start(callListener, TracingMetadataUtils.headersFromCurrentContext()); |
| call.request(1); |
| return uploadResult; |
| } |
| |
| void cancel() { |
| if (call != null) { |
| call.cancel("Cancelled by user.", null); |
| } |
| } |
| } |
| } |