| // Copyright 2017 The Bazel Authors. All rights reserved. |
| // |
| // Licensed under the Apache License, Version 2.0 (the "License"); |
| // you may not use this file except in compliance with the License. |
| // You may obtain a copy of the License at |
| // |
| // http://www.apache.org/licenses/LICENSE-2.0 |
| // |
| // Unless required by applicable law or agreed to in writing, software |
| // distributed under the License is distributed on an "AS IS" BASIS, |
| // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| // See the License for the specific language governing permissions and |
| // limitations under the License. |
| package com.google.devtools.build.lib.remote; |
| |
| import static com.google.common.base.Preconditions.checkArgument; |
| import static com.google.common.base.Preconditions.checkNotNull; |
| import static com.google.common.base.Preconditions.checkState; |
| import static java.lang.String.format; |
| import static java.util.Collections.singletonList; |
| import static java.util.concurrent.TimeUnit.MILLISECONDS; |
| import static java.util.concurrent.TimeUnit.SECONDS; |
| |
| import com.google.bytestream.ByteStreamGrpc; |
| import com.google.bytestream.ByteStreamProto.WriteRequest; |
| import com.google.bytestream.ByteStreamProto.WriteResponse; |
| import com.google.common.annotations.VisibleForTesting; |
| import com.google.common.base.Strings; |
| import com.google.common.base.Throwables; |
| import com.google.common.util.concurrent.ListenableFuture; |
| import com.google.common.util.concurrent.ListenableScheduledFuture; |
| import com.google.common.util.concurrent.ListeningScheduledExecutorService; |
| import com.google.common.util.concurrent.MoreExecutors; |
| import com.google.common.util.concurrent.SettableFuture; |
| import com.google.devtools.build.lib.remote.Retrier.RetryException; |
| import com.google.devtools.build.lib.remote.util.TracingMetadataUtils; |
| import com.google.devtools.remoteexecution.v1test.Digest; |
| import io.grpc.CallCredentials; |
| import io.grpc.CallOptions; |
| import io.grpc.Channel; |
| import io.grpc.ClientCall; |
| import io.grpc.Context; |
| import io.grpc.Metadata; |
| import io.grpc.Status; |
| import io.grpc.StatusException; |
| import java.io.IOException; |
| import java.util.ArrayList; |
| import java.util.HashMap; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.UUID; |
| import java.util.concurrent.ExecutionException; |
| import java.util.concurrent.Future; |
| import java.util.concurrent.RejectedExecutionException; |
| import java.util.logging.Level; |
| import java.util.logging.Logger; |
| import javax.annotation.Nullable; |
| import javax.annotation.concurrent.GuardedBy; |
| |
| /** |
| * A client implementing the {@code Write} method of the {@code ByteStream} gRPC service. |
| * |
| * <p>Users must call {@link #shutdown()} before exiting. |
| */ |
| final class ByteStreamUploader { |
| |
| private static final Logger logger = Logger.getLogger(ByteStreamUploader.class.getName()); |
| |
| private final String instanceName; |
| private final Channel channel; |
| private final CallCredentials callCredentials; |
| private final long callTimeoutSecs; |
| private final RemoteRetrier retrier; |
| private final ListeningScheduledExecutorService retryService; |
| |
| private final Object lock = new Object(); |
| |
| @GuardedBy("lock") |
| private final Map<Digest, ListenableFuture<Void>> uploadsInProgress = new HashMap<>(); |
| |
| @GuardedBy("lock") |
| private boolean isShutdown; |
| |
| /** |
| * Creates a new instance. |
| * |
| * @param instanceName the instance name to be prepended to resource name of the {@code Write} |
| * call. See the {@code ByteStream} service definition for details |
| * @param channel the {@link io.grpc.Channel} to use for calls |
| * @param callCredentials the credentials to use for authentication. May be {@code null}, in which |
| * case no authentication is performed |
| * @param callTimeoutSecs the timeout in seconds after which a {@code Write} gRPC call must be |
| * complete. The timeout resets between retries |
| * @param retrier the {@link RemoteRetrier} whose backoff strategy to use for retry timings. |
| * @param retryService the executor service to schedule retries on. It's the responsibility of the |
| * caller to properly shutdown the service after use. Users should avoid shutting down the |
| * service before {@link #shutdown()} has been called |
| */ |
| public ByteStreamUploader( |
| @Nullable String instanceName, |
| Channel channel, |
| @Nullable CallCredentials callCredentials, |
| long callTimeoutSecs, |
| RemoteRetrier retrier, |
| ListeningScheduledExecutorService retryService) { |
| checkArgument(callTimeoutSecs > 0, "callTimeoutSecs must be gt 0."); |
| |
| this.instanceName = instanceName; |
| this.channel = channel; |
| this.callCredentials = callCredentials; |
| this.callTimeoutSecs = callTimeoutSecs; |
| this.retrier = retrier; |
| this.retryService = retryService; |
| } |
| |
| /** |
| * Uploads a BLOB, as provided by the {@link Chunker}, to the remote {@code ByteStream} service. |
| * The call blocks until the upload is complete, or throws an {@link Exception} in case of error. |
| * |
| * <p>Uploads are retried according to the specified {@link RemoteRetrier}. Retrying is |
| * transparent to the user of this API. |
| * |
| * <p>Trying to upload the same BLOB multiple times concurrently, results in only one upload being |
| * performed. This is transparent to the user of this API. |
| * |
| * @throws IOException when reading of the {@link Chunker}s input source fails |
| * @throws RetryException when the upload failed after a retry |
| */ |
| public void uploadBlob(Chunker chunker) throws IOException, InterruptedException { |
| uploadBlobs(singletonList(chunker)); |
| } |
| |
| /** |
| * Uploads a list of BLOBs concurrently to the remote {@code ByteStream} service. The call blocks |
| * until the upload of all BLOBs is complete, or throws an {@link Exception} after the first |
| * upload failed. Any other uploads will continue uploading in the background, until they complete |
| * or the {@link #shutdown()} method is called. Errors encountered by these uploads are swallowed. |
| * |
| * <p>Uploads are retried according to the specified {@link RemoteRetrier}. Retrying is |
| * transparent to the user of this API. |
| * |
| * <p>Trying to upload the same BLOB multiple times concurrently, results in only one upload being |
| * performed. This is transparent to the user of this API. |
| * |
| * @throws IOException when reading of the {@link Chunker}s input source fails |
| * @throws RetryException when the upload failed after a retry |
| */ |
| public void uploadBlobs(Iterable<Chunker> chunkers) throws IOException, InterruptedException { |
| List<ListenableFuture<Void>> uploads = new ArrayList<>(); |
| |
| for (Chunker chunker : chunkers) { |
| uploads.add(uploadBlobAsync(chunker)); |
| } |
| |
| try { |
| for (ListenableFuture<Void> upload : uploads) { |
| upload.get(); |
| } |
| } catch (ExecutionException e) { |
| Throwable cause = e.getCause(); |
| if (cause instanceof RetryException) { |
| throw (RetryException) cause; |
| } else { |
| throw Throwables.propagate(cause); |
| } |
| } catch (InterruptedException e) { |
| Thread.interrupted(); |
| throw e; |
| } |
| } |
| |
| /** |
| * Cancels all running uploads. The method returns immediately and does NOT wait for the uploads |
| * to be cancelled. |
| * |
| * <p>This method must be the last method called. |
| */ |
| public void shutdown() { |
| synchronized (lock) { |
| if (isShutdown) { |
| return; |
| } |
| isShutdown = true; |
| // Before cancelling, copy the futures to a separate list in order to avoid concurrently |
| // iterating over and modifying the map (cancel triggers a listener that removes the entry |
| // from the map. the listener is executed in the same thread.). |
| List<Future<Void>> uploadsToCancel = new ArrayList<>(uploadsInProgress.values()); |
| for (Future<Void> upload : uploadsToCancel) { |
| upload.cancel(true); |
| } |
| } |
| } |
| |
| @VisibleForTesting |
| ListenableFuture<Void> uploadBlobAsync(Chunker chunker) |
| throws IOException { |
| Digest digest = checkNotNull(chunker.digest()); |
| |
| synchronized (lock) { |
| checkState(!isShutdown, "Must not call uploadBlobs after shutdown."); |
| |
| ListenableFuture<Void> uploadResult = uploadsInProgress.get(digest); |
| if (uploadResult == null) { |
| uploadResult = SettableFuture.create(); |
| uploadResult.addListener( |
| () -> { |
| synchronized (lock) { |
| uploadsInProgress.remove(digest); |
| } |
| }, |
| MoreExecutors.directExecutor()); |
| startAsyncUploadWithRetry( |
| chunker, retrier.newBackoff(), (SettableFuture<Void>) uploadResult); |
| uploadsInProgress.put(digest, uploadResult); |
| } |
| return uploadResult; |
| } |
| } |
| |
| @VisibleForTesting |
| boolean uploadsInProgress() { |
| synchronized (lock) { |
| return !uploadsInProgress.isEmpty(); |
| } |
| } |
| |
| private void startAsyncUploadWithRetry( |
| Chunker chunker, Retrier.Backoff backoffTimes, SettableFuture<Void> overallUploadResult) { |
| |
| AsyncUpload.Listener listener = |
| new AsyncUpload.Listener() { |
| @Override |
| public void success() { |
| overallUploadResult.set(null); |
| } |
| |
| @Override |
| public void failure(Status status) { |
| StatusException cause = status.asException(); |
| long nextDelayMillis = backoffTimes.nextDelayMillis(); |
| if (nextDelayMillis < 0 || !retrier.isRetriable(cause)) { |
| // Out of retries or status not retriable. |
| RetryException error = |
| new RetryException( |
| "Out of retries or status not retriable.", |
| backoffTimes.getRetryAttempts(), |
| cause); |
| overallUploadResult.setException(error); |
| } else { |
| retryAsyncUpload(nextDelayMillis, chunker, backoffTimes, overallUploadResult); |
| } |
| } |
| |
| private void retryAsyncUpload( |
| long nextDelayMillis, |
| Chunker chunker, |
| Retrier.Backoff backoffTimes, |
| SettableFuture<Void> overallUploadResult) { |
| try { |
| ListenableScheduledFuture<?> schedulingResult = |
| retryService.schedule( |
| Context.current() |
| .wrap( |
| () -> |
| startAsyncUploadWithRetry( |
| chunker, backoffTimes, overallUploadResult)), |
| nextDelayMillis, |
| MILLISECONDS); |
| // In case the scheduled execution errors, we need to notify the overallUploadResult. |
| schedulingResult.addListener( |
| () -> { |
| try { |
| schedulingResult.get(); |
| } catch (Exception e) { |
| overallUploadResult.setException( |
| new RetryException( |
| "Scheduled execution errored.", backoffTimes.getRetryAttempts(), e)); |
| } |
| }, |
| MoreExecutors.directExecutor()); |
| } catch (RejectedExecutionException e) { |
| // May be thrown by .schedule(...) if i.e. the executor is shutdown. |
| overallUploadResult.setException( |
| new RetryException("Rejected by executor.", backoffTimes.getRetryAttempts(), e)); |
| } |
| } |
| }; |
| |
| try { |
| chunker.reset(); |
| } catch (IOException e) { |
| overallUploadResult.setException(e); |
| return; |
| } |
| |
| AsyncUpload newUpload = |
| new AsyncUpload(channel, callCredentials, callTimeoutSecs, instanceName, chunker, listener); |
| overallUploadResult.addListener( |
| () -> { |
| if (overallUploadResult.isCancelled()) { |
| newUpload.cancel(); |
| } |
| }, |
| MoreExecutors.directExecutor()); |
| newUpload.start(); |
| } |
| |
| private static class AsyncUpload { |
| |
| interface Listener { |
| void success(); |
| |
| void failure(Status status); |
| } |
| |
| private final Channel channel; |
| private final CallCredentials callCredentials; |
| private final long callTimeoutSecs; |
| private final String instanceName; |
| private final Chunker chunker; |
| private final Listener listener; |
| |
| private ClientCall<WriteRequest, WriteResponse> call; |
| |
| AsyncUpload( |
| Channel channel, |
| CallCredentials callCredentials, |
| long callTimeoutSecs, |
| String instanceName, |
| Chunker chunker, |
| Listener listener) { |
| this.channel = channel; |
| this.callCredentials = callCredentials; |
| this.callTimeoutSecs = callTimeoutSecs; |
| this.instanceName = instanceName; |
| this.chunker = chunker; |
| this.listener = listener; |
| } |
| |
| void start() { |
| CallOptions callOptions = |
| CallOptions.DEFAULT |
| .withCallCredentials(callCredentials) |
| .withDeadlineAfter(callTimeoutSecs, SECONDS); |
| call = channel.newCall(ByteStreamGrpc.getWriteMethod(), callOptions); |
| |
| ClientCall.Listener<WriteResponse> callListener = |
| new ClientCall.Listener<WriteResponse>() { |
| |
| private final WriteRequest.Builder requestBuilder = WriteRequest.newBuilder(); |
| private boolean callHalfClosed = false; |
| |
| @Override |
| public void onMessage(WriteResponse response) { |
| // TODO(buchgr): The ByteStream API allows to resume the upload at the committedSize. |
| } |
| |
| @Override |
| public void onClose(Status status, Metadata trailers) { |
| if (status.isOk()) { |
| listener.success(); |
| } else { |
| listener.failure(status); |
| } |
| } |
| |
| @Override |
| public void onReady() { |
| while (call.isReady()) { |
| if (!chunker.hasNext()) { |
| // call.halfClose() may only be called once. Guard against it being called more |
| // often. |
| // See: https://github.com/grpc/grpc-java/issues/3201 |
| if (!callHalfClosed) { |
| callHalfClosed = true; |
| // Every chunk has been written. No more work to do. |
| call.halfClose(); |
| } |
| return; |
| } |
| |
| try { |
| requestBuilder.clear(); |
| Chunker.Chunk chunk = chunker.next(); |
| |
| if (chunk.getOffset() == 0) { |
| // Resource name only needs to be set on the first write for each file. |
| requestBuilder.setResourceName(newResourceName(chunk.getDigest())); |
| } |
| |
| boolean isLastChunk = !chunker.hasNext(); |
| WriteRequest request = |
| requestBuilder |
| .setData(chunk.getData()) |
| .setWriteOffset(chunk.getOffset()) |
| .setFinishWrite(isLastChunk) |
| .build(); |
| |
| call.sendMessage(request); |
| } catch (IOException e) { |
| try { |
| chunker.reset(); |
| } catch (IOException e1) { |
| // This exception indicates that closing the underlying input stream failed. |
| // We don't expect this to ever happen, but don't want to swallow the exception |
| // completely. |
| logger.log(Level.WARNING, "Chunker failed closing data source.", e1); |
| } finally { |
| call.cancel("Failed to read next chunk.", e); |
| } |
| } |
| } |
| } |
| |
| private String newResourceName(Digest digest) { |
| String resourceName = |
| format( |
| "uploads/%s/blobs/%s/%d", |
| UUID.randomUUID(), digest.getHash(), digest.getSizeBytes()); |
| if (!Strings.isNullOrEmpty(instanceName)) { |
| resourceName = instanceName + "/" + resourceName; |
| } |
| return resourceName; |
| } |
| }; |
| call.start(callListener, TracingMetadataUtils.headersFromCurrentContext()); |
| call.request(1); |
| } |
| |
| void cancel() { |
| if (call != null) { |
| call.cancel("Cancelled by user.", null); |
| } |
| } |
| } |
| } |