blob: 08a2f2dfa6a81b0400e45fbacac4e7fa9827a60a [file] [log] [blame]
// Copyright 2018 The Bazel Authors. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package com.google.devtools.build.lib.remote.blobstore.http;
import com.google.auth.Credentials;
import com.google.devtools.build.lib.remote.blobstore.SimpleBlobStore;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelOption;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.pool.ChannelPool;
import io.netty.channel.pool.ChannelPoolHandler;
import io.netty.channel.pool.SimpleChannelPool;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.http.HttpClientCodec;
import io.netty.handler.codec.http.HttpHeaderNames;
import io.netty.handler.codec.http.HttpObjectAggregator;
import io.netty.handler.codec.http.HttpRequestEncoder;
import io.netty.handler.codec.http.HttpResponse;
import io.netty.handler.codec.http.HttpResponseDecoder;
import io.netty.handler.codec.http.HttpResponseStatus;
import io.netty.handler.ssl.OpenSsl;
import io.netty.handler.ssl.SslContext;
import io.netty.handler.ssl.SslContextBuilder;
import io.netty.handler.ssl.SslHandler;
import io.netty.handler.ssl.SslProvider;
import io.netty.handler.stream.ChunkedWriteHandler;
import io.netty.util.internal.PlatformDependent;
import java.io.ByteArrayInputStream;
import java.io.FileInputStream;
import java.io.FilterInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.URI;
import java.util.List;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.regex.Pattern;
import javax.annotation.Nullable;
import javax.annotation.concurrent.GuardedBy;
import javax.net.ssl.SSLEngine;
/**
* Implementation of {@link SimpleBlobStore} that can talk to a HTTP/1.1 backend.
*
* <p>Blobs (Binary large objects) are uploaded using the {@code PUT} method. Action cache blobs are
* stored under the path {@code /ac/base16-key}. CAS (Content Addressable Storage) blobs are stored
* under the path {@code /cas/base16-key}. Valid status codes for a successful upload are 200 (OK),
* 201 (CREATED), 202 (ACCEPTED) and 204 (NO CONTENT). It's recommended to return 200 (OK) on
* success. The other status codes are supported to be compatibility with the nginx webdav module
* and may be removed in the future.
*
* <p>Blobs are downloaded using the {@code GET} method at the paths they were stored at. A status
* code of 200 should be followed by the content of the blob. The status codes 404 (NOT FOUND) and
* 204 (NO CONTENT) indicate that no cache entry exists. It's recommended to return 404 (NOT FOUND)
* as the 204 (NO CONTENT) status code is only supported for compatibility with the nginx webdav
* module.
*
* <p>TLS is supported and enabled automatically when using HTTPS as the URI scheme.
*
* <p>Uploads do not use {@code Expect: 100-CONTINUE} headers, as this would incur an additional
* roundtrip for every upload and with little practical value as we would expect most uploads to be
* accepted.
*
* <p>The implementation currently does not support transfer encoding chunked.
*/
public final class HttpBlobStore implements SimpleBlobStore {
private static final Pattern INVALID_TOKEN_ERROR =
Pattern.compile("\\s*error\\s*=\\s*\"?invalid_token\"?");
private final NioEventLoopGroup eventLoop = new NioEventLoopGroup(2 /* number of threads */);
private final ChannelPool downloadChannels;
private final ChannelPool uploadChannels;
private final URI uri;
private final Object credentialsLock = new Object();
@GuardedBy("credentialsLock")
private final Credentials creds;
@GuardedBy("credentialsLock")
private long lastRefreshTime;
public HttpBlobStore(URI uri, int timeoutMillis, @Nullable final Credentials creds)
throws Exception {
boolean useTls = uri.getScheme().equals("https");
if (uri.getPort() == -1) {
int port = useTls ? 443 : 80;
uri =
new URI(
uri.getScheme(),
uri.getUserInfo(),
uri.getHost(),
port,
uri.getPath(),
uri.getQuery(),
uri.getFragment());
}
this.uri = uri;
final SslContext sslCtx;
if (useTls) {
// OpenSsl gives us a > 2x speed improvement on fast networks, but requires netty tcnative
// to be there which is not available on all platforms and environments.
SslProvider sslProvider = OpenSsl.isAvailable() ? SslProvider.OPENSSL : SslProvider.JDK;
sslCtx = SslContextBuilder.forClient().sslProvider(sslProvider).build();
} else {
sslCtx = null;
}
Bootstrap clientBootstrap =
new Bootstrap()
.channel(NioSocketChannel.class)
.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, timeoutMillis)
.option(ChannelOption.SO_TIMEOUT, timeoutMillis)
.group(eventLoop)
.remoteAddress(uri.getHost(), uri.getPort());
downloadChannels =
new SimpleChannelPool(
clientBootstrap,
new ChannelPoolHandler() {
@Override
public void channelReleased(Channel ch) {}
@Override
public void channelAcquired(Channel ch) {}
@Override
public void channelCreated(Channel ch) {
ChannelPipeline p = ch.pipeline();
if (sslCtx != null) {
SSLEngine engine = sslCtx.newEngine(ch.alloc());
engine.setUseClientMode(true);
p.addFirst(new SslHandler(engine));
}
p.addLast(new HttpClientCodec());
p.addLast(new HttpDownloadHandler(creds));
}
});
uploadChannels =
new SimpleChannelPool(
clientBootstrap,
new ChannelPoolHandler() {
@Override
public void channelReleased(Channel ch) {}
@Override
public void channelAcquired(Channel ch) {}
@Override
public void channelCreated(Channel ch) {
ChannelPipeline p = ch.pipeline();
if (sslCtx != null) {
SSLEngine engine = sslCtx.newEngine(ch.alloc());
engine.setUseClientMode(true);
p.addFirst(new SslHandler(engine));
}
p.addLast(new HttpResponseDecoder());
// The 10KiB limit was chosen at random. We only expect HTTP servers to respond with
// an error message in the body and that should always be less than 10KiB.
p.addLast(new HttpObjectAggregator(10 * 1024));
p.addLast(new HttpRequestEncoder());
p.addLast(new ChunkedWriteHandler());
p.addLast(new HttpUploadHandler(creds));
}
});
this.creds = creds;
}
@Override
public boolean containsKey(String key) throws IOException, InterruptedException {
throw new UnsupportedOperationException("HTTP Caching does not use this method.");
}
@Override
public boolean get(String key, OutputStream out) throws IOException, InterruptedException {
return get(key, out, true);
}
@SuppressWarnings("FutureReturnValueIgnored")
private boolean get(String key, final OutputStream out, boolean casDownload)
throws IOException, InterruptedException {
final AtomicBoolean dataWritten = new AtomicBoolean();
OutputStream wrappedOut =
new OutputStream() {
// OutputStream.close() does nothing, which is what we want to ensure that the
// OutputStream can't be closed somewhere in the Netty pipeline, so that we can support
// retries. The OutputStream is closed in the finally block below.
@Override
public void write(byte[] b, int offset, int length) throws IOException {
dataWritten.set(true);
out.write(b, offset, length);
}
@Override
public void write(int b) throws IOException {
dataWritten.set(true);
out.write(b);
}
@Override
public void flush() throws IOException {
out.flush();
}
};
DownloadCommand download = new DownloadCommand(uri, casDownload, key, wrappedOut);
;
Channel ch = null;
try {
ch = acquireDownloadChannel();
ChannelFuture downloadFuture = ch.writeAndFlush(download);
downloadFuture.sync();
return true;
} catch (Exception e) {
// e can be of type HttpException, because Netty uses Unsafe.throwException to re-throw a
// checked exception that hasn't been declared in the method signature.
if (e instanceof HttpException) {
HttpResponse response = ((HttpException) e).response();
if (!dataWritten.get() && authTokenExpired(response)) {
// The error is due to an auth token having expired. Let's try again.
refreshCredentials();
return getAfterCredentialRefresh(download);
}
if (cacheMiss(response.status())) {
return false;
}
}
throw e;
} finally {
if (ch != null) {
downloadChannels.release(ch);
}
}
}
@SuppressWarnings("FutureReturnValueIgnored")
private boolean getAfterCredentialRefresh(DownloadCommand cmd) throws InterruptedException {
Channel ch = null;
try {
ch = acquireDownloadChannel();
ChannelFuture downloadFuture = ch.writeAndFlush(cmd);
downloadFuture.sync();
return true;
} catch (Exception e) {
if (e instanceof HttpException) {
HttpResponse response = ((HttpException) e).response();
if (cacheMiss(response.status())) {
return false;
}
}
throw e;
} finally {
if (ch != null) {
downloadChannels.release(ch);
}
}
}
@Override
public boolean getActionResult(String actionKey, OutputStream out)
throws IOException, InterruptedException {
return get(actionKey, out, false);
}
@Override
public void put(String key, long length, InputStream in)
throws IOException, InterruptedException {
put(key, length, in, true);
}
@SuppressWarnings("FutureReturnValueIgnored")
private void put(String key, long length, InputStream in, boolean casUpload)
throws IOException, InterruptedException {
InputStream wrappedIn =
new FilterInputStream(in) {
@Override
public void close() {
// Ensure that the InputStream can't be closed somewhere in the Netty
// pipeline, so that we can support retries. The InputStream is closed in
// the finally block below.
}
};
UploadCommand upload = new UploadCommand(uri, casUpload, key, wrappedIn, length);
Channel ch = null;
try {
ch = acquireUploadChannel();
ChannelFuture uploadFuture = ch.writeAndFlush(upload);
uploadFuture.sync();
} catch (Exception e) {
// e can be of type HttpException, because Netty uses Unsafe.throwException to re-throw a
// checked exception that hasn't been declared in the method signature.
if (e instanceof HttpException) {
HttpResponse response = ((HttpException) e).response();
if (authTokenExpired(response)) {
refreshCredentials();
// The error is due to an auth token having expired. Let's try again.
if (!reset(in)) {
// The InputStream can't be reset and thus we can't retry as most likely
// bytes have already been read from the InputStream.
throw e;
}
putAfterCredentialRefresh(upload);
return;
}
}
throw e;
} finally {
in.close();
if (ch != null) {
uploadChannels.release(ch);
}
}
}
@SuppressWarnings("FutureReturnValueIgnored")
private void putAfterCredentialRefresh(UploadCommand cmd) throws InterruptedException {
Channel ch = null;
try {
ch = acquireUploadChannel();
ChannelFuture uploadFuture = ch.writeAndFlush(cmd);
uploadFuture.sync();
} finally {
if (ch != null) {
uploadChannels.release(ch);
}
}
}
private boolean reset(InputStream in) throws IOException {
if (in.markSupported()) {
in.reset();
return true;
}
if (in instanceof FileInputStream) {
// FileInputStream does not support reset().
((FileInputStream) in).getChannel().position(0);
return true;
}
return false;
}
@Override
public void putActionResult(String actionKey, byte[] data)
throws IOException, InterruptedException {
try (InputStream in = new ByteArrayInputStream(data)) {
put(actionKey, data.length, in, false);
}
}
/**
* It's safe to suppress this warning because all methods on Netty futures return {@code this}. So
* we are not ignoring anything.
*/
@SuppressWarnings("FutureReturnValueIgnored")
@Override
public void close() {
downloadChannels.close();
uploadChannels.close();
eventLoop.shutdownGracefully();
}
private boolean cacheMiss(HttpResponseStatus status) {
// Supporting NO_CONTENT for nginx webdav compatibility.
return status.equals(HttpResponseStatus.NOT_FOUND)
|| status.equals(HttpResponseStatus.NO_CONTENT);
}
/** See https://tools.ietf.org/html/rfc6750#section-3.1 */
private boolean authTokenExpired(HttpResponse response) {
synchronized (credentialsLock) {
if (creds == null) {
return false;
}
}
List<String> values = response.headers().getAllAsString(HttpHeaderNames.WWW_AUTHENTICATE);
String value = String.join(",", values);
if (value != null && value.startsWith("Bearer")) {
return INVALID_TOKEN_ERROR.matcher(value).find();
} else {
return response.status().equals(HttpResponseStatus.UNAUTHORIZED);
}
}
private Channel acquireDownloadChannel() throws InterruptedException {
try {
return downloadChannels.acquire().get();
} catch (ExecutionException e) {
PlatformDependent.throwException(e.getCause());
return null;
}
}
private Channel acquireUploadChannel() throws InterruptedException {
try {
return uploadChannels.acquire().get();
} catch (ExecutionException e) {
PlatformDependent.throwException(e.getCause());
return null;
}
}
private void refreshCredentials() throws IOException {
synchronized (credentialsLock) {
long now = System.currentTimeMillis();
// Call creds.refresh() at most once per second. The one second was arbitrarily chosen, as
// a small enough value that we don't expect to interfere with actual token lifetimes, but
// it should just make sure that potentially hundreds of threads don't call this method
// at the same time.
if ((now - lastRefreshTime) > TimeUnit.SECONDS.toMillis(1)) {
lastRefreshTime = now;
creds.refresh();
}
}
}
}