Add (inactive) code to compress large GenQuery results in memory on the fly
Dynamically switches to gzipping GenQuery results >1M, trading CPU for memory.
Hard coded to off for now while I sort out how to connect this to a flag.
PiperOrigin-RevId: 258568242
diff --git a/src/main/java/com/google/devtools/build/lib/rules/genquery/GenQuery.java b/src/main/java/com/google/devtools/build/lib/rules/genquery/GenQuery.java
index 118e783..190aa39 100644
--- a/src/main/java/com/google/devtools/build/lib/rules/genquery/GenQuery.java
+++ b/src/main/java/com/google/devtools/build/lib/rules/genquery/GenQuery.java
@@ -36,7 +36,7 @@
import com.google.devtools.build.lib.analysis.Runfiles;
import com.google.devtools.build.lib.analysis.RunfilesProvider;
import com.google.devtools.build.lib.analysis.actions.AbstractFileWriteAction;
-import com.google.devtools.build.lib.analysis.actions.ByteStringDeterministicWriter;
+import com.google.devtools.build.lib.analysis.actions.AbstractFileWriteAction.DeterministicWriter;
import com.google.devtools.build.lib.analysis.config.CoreOptions;
import com.google.devtools.build.lib.cmdline.Label;
import com.google.devtools.build.lib.cmdline.PackageIdentifier;
@@ -73,6 +73,7 @@
import com.google.devtools.build.lib.query2.query.output.QueryOptions;
import com.google.devtools.build.lib.query2.query.output.QueryOptions.OrderOutput;
import com.google.devtools.build.lib.query2.query.output.QueryOutputUtils;
+import com.google.devtools.build.lib.rules.genquery.GenQueryOutputStream.GenQueryResult;
import com.google.devtools.build.lib.runtime.KeepGoingOption;
import com.google.devtools.build.lib.skyframe.PackageValue;
import com.google.devtools.build.lib.skyframe.TargetPatternValue;
@@ -92,6 +93,7 @@
import com.google.devtools.common.options.OptionsParsingException;
import com.google.protobuf.ByteString;
import java.io.IOException;
+import java.io.OutputStream;
import java.nio.channels.ClosedByInterruptException;
import java.util.Collection;
import java.util.HashSet;
@@ -117,6 +119,7 @@
// The query string
final String query = ruleContext.attributes().get("expression", Type.STRING);
+ @SuppressWarnings("unchecked")
OptionsParser optionsParser =
OptionsParser.builder()
.optionsClasses(QueryOptions.class, KeepGoingOption.class)
@@ -169,7 +172,7 @@
// force relative_locations to true so it has a deterministic output across machines.
queryOptions.relativeLocations = true;
- ByteString result;
+ GenQueryResult result;
try (SilentCloseable c =
Profiler.instance().profile("GenQuery.executeQuery/" + ruleContext.getLabel())) {
result =
@@ -193,11 +196,14 @@
NestedSet<Artifact> filesToBuild = NestedSetBuilder.create(Order.STABLE_ORDER, outputArtifact);
return new RuleConfiguredTargetBuilder(ruleContext)
.setFilesToBuild(filesToBuild)
- .add(RunfilesProvider.class, RunfilesProvider.simple(
- new Runfiles.Builder(
- ruleContext.getWorkspaceName(),
- ruleContext.getConfiguration().legacyExternalRunfiles())
- .addTransitiveArtifacts(filesToBuild).build()))
+ .addProvider(
+ RunfilesProvider.class,
+ RunfilesProvider.simple(
+ new Runfiles.Builder(
+ ruleContext.getWorkspaceName(),
+ ruleContext.getConfiguration().legacyExternalRunfiles())
+ .addTransitiveArtifacts(filesToBuild)
+ .build()))
.build();
}
@@ -273,7 +279,7 @@
}
@Nullable
- private ByteString executeQuery(
+ private GenQueryResult executeQuery(
RuleContext ruleContext, QueryOptions queryOptions, Collection<Label> scope, String query)
throws InterruptedException {
SkyFunction.Environment env = ruleContext.getAnalysisEnvironment().getSkyframeEnv();
@@ -300,7 +306,7 @@
@SuppressWarnings("unchecked")
@Nullable
- private ByteString doQuery(
+ private GenQueryResult doQuery(
QueryOptions queryOptions,
PreloadedMapPackageProvider packageProvider,
Predicate<Label> labelFilter,
@@ -372,37 +378,39 @@
throw new RuntimeException(e);
}
- ByteString.Output outputStream = ByteString.newOutput();
+ // TODO(b/137379942): Enable compression.
+ GenQueryOutputStream outputStream = new GenQueryOutputStream(/*compressionEnabled=*/ false);
try {
QueryOutputUtils
.output(queryOptions, queryResult, targets.getResult(), formatter, outputStream,
queryOptions.aspectDeps.createResolver(packageProvider, getEventHandler(ruleContext)));
+ outputStream.close();
} catch (ClosedByInterruptException e) {
throw new InterruptedException(e.getMessage());
} catch (IOException e) {
throw new RuntimeException(e);
}
- return outputStream.toByteString();
+ return outputStream.getResult();
}
@Immutable // assuming no other reference to result
private static final class QueryResultAction extends AbstractFileWriteAction {
- private final ByteString result;
+ private final GenQueryResult result;
- private QueryResultAction(ActionOwner owner, Artifact output, ByteString result) {
+ private QueryResultAction(ActionOwner owner, Artifact output, GenQueryResult result) {
super(owner, ImmutableList.<Artifact>of(), output, /*makeExecutable=*/false);
this.result = result;
}
@Override
public DeterministicWriter newDeterministicWriter(ActionExecutionContext ctx) {
- return new ByteStringDeterministicWriter(result);
+ return new GenQueryResultWriter(result);
}
@Override
protected void computeKey(ActionKeyContext actionKeyContext, Fingerprint fp) {
- fp.addBytes(result);
+ result.fingerprint(fp);
}
}
@@ -575,4 +583,22 @@
super(message);
}
}
+
+ private static class GenQueryResultWriter implements DeterministicWriter {
+ private final GenQueryResult genQueryResult;
+
+ GenQueryResultWriter(GenQueryResult genQueryResult) {
+ this.genQueryResult = genQueryResult;
+ }
+
+ @Override
+ public void writeOutputFile(OutputStream out) throws IOException {
+ genQueryResult.writeTo(out);
+ }
+
+ @Override
+ public ByteString getBytes() throws IOException {
+ return genQueryResult.getBytes();
+ }
+ }
}
diff --git a/src/main/java/com/google/devtools/build/lib/rules/genquery/GenQueryOutputStream.java b/src/main/java/com/google/devtools/build/lib/rules/genquery/GenQueryOutputStream.java
new file mode 100644
index 0000000..0e1b36a
--- /dev/null
+++ b/src/main/java/com/google/devtools/build/lib/rules/genquery/GenQueryOutputStream.java
@@ -0,0 +1,210 @@
+// Copyright 2019 The Bazel Authors. All rights reserved.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.google.devtools.build.lib.rules.genquery;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import com.google.common.io.ByteStreams;
+import com.google.devtools.build.lib.util.Fingerprint;
+import com.google.protobuf.ByteString;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.util.zip.GZIPInputStream;
+import java.util.zip.GZIPOutputStream;
+
+/**
+ * {@link OutputStream} implementation optimized for {@link GenQuery} by (optionally) compressing
+ * query results on the fly. Produces {@link GenQueryResult}s which are preferred for storing the
+ * output of {@link GenQuery}'s underlying queries.
+ */
+class GenQueryOutputStream extends OutputStream {
+
+ /**
+ * When compression is enabled, the threshold at which the stream will switch to compressing
+ * output. The value of this constant is arbitrary but effective.
+ */
+ private static final int COMPRESSION_THRESHOLD = 1 << 20;
+
+ /**
+ * Encapsulates the output of a {@link GenQuery}'s query. CPU and memory overhead of individual
+ * methods depends on the underlying content and settings.
+ */
+ interface GenQueryResult {
+ /** Returns the query output as a {@link ByteString}. */
+ ByteString getBytes() throws IOException;
+
+ /**
+ * Adds the query output to the supplied {@link Fingerprint}. Equivalent to {@code
+ * fingerprint.addBytes(genQueryResult.getBytes())}, but potentially more efficient.
+ */
+ void fingerprint(Fingerprint fingerprint);
+
+ /**
+ * Returns the size of the output. This must be a constant-time operation for all
+ * implementations.
+ */
+ int size();
+
+ /**
+ * Writes the query output to the provided {@link OutputStream}. Equivalent to {@code
+ * genQueryResult.getBytes().writeTo(out)}, but potentially more efficient.
+ */
+ void writeTo(OutputStream out) throws IOException;
+ }
+
+ private final boolean compressionEnabled;
+ private int bytesWritten = 0;
+ private boolean compressed = false;
+ private boolean closed = false;
+ private ByteString.Output bytesOut = ByteString.newOutput();
+ private OutputStream out = bytesOut;
+
+ GenQueryOutputStream(boolean compressionEnabled) {
+ this.compressionEnabled = compressionEnabled;
+ }
+
+ @Override
+ public void write(int b) throws IOException {
+ maybeStartCompression(1);
+ out.write(b);
+ bytesWritten += 1;
+ }
+
+ @Override
+ public void write(byte[] bytes) throws IOException {
+ write(bytes, 0, bytes.length);
+ }
+
+ @Override
+ public void write(byte[] bytes, int off, int len) throws IOException {
+ maybeStartCompression(len);
+ out.write(bytes, off, len);
+ bytesWritten += len;
+ }
+
+ @Override
+ public void flush() throws IOException {
+ out.flush();
+ }
+
+ @Override
+ public void close() throws IOException {
+ out.close();
+ closed = true;
+ }
+
+ GenQueryResult getResult() {
+ Preconditions.checkState(closed, "Must be closed");
+ return compressed
+ ? new CompressedResult(bytesOut.toByteString(), bytesWritten)
+ : new RegularResult(bytesOut.toByteString());
+ }
+
+ private void maybeStartCompression(int additionalBytes) throws IOException {
+ if (!compressionEnabled) {
+ return;
+ }
+
+ if (compressed) {
+ return;
+ }
+
+ if (bytesWritten + additionalBytes < COMPRESSION_THRESHOLD) {
+ return;
+ }
+
+ ByteString.Output compressedBytesOut = ByteString.newOutput();
+ GZIPOutputStream gzipOut = new GZIPOutputStream(compressedBytesOut);
+ bytesOut.writeTo(gzipOut);
+ bytesOut = compressedBytesOut;
+ out = gzipOut;
+ compressed = true;
+ }
+
+ @VisibleForTesting
+ static class RegularResult implements GenQueryResult {
+ private final ByteString data;
+
+ RegularResult(ByteString data) {
+ this.data = data;
+ }
+
+ @Override
+ public ByteString getBytes() {
+ return data;
+ }
+
+ @Override
+ public int size() {
+ return data.size();
+ }
+
+ @Override
+ public void fingerprint(Fingerprint fingerprint) {
+ fingerprint.addBytes(data);
+ }
+
+ @Override
+ public void writeTo(OutputStream out) throws IOException {
+ data.writeTo(out);
+ }
+ }
+
+ @VisibleForTesting
+ static class CompressedResult implements GenQueryResult {
+ private final ByteString compressedData;
+ private final int size;
+
+ CompressedResult(ByteString compressedData, int size) {
+ this.compressedData = compressedData;
+ this.size = size;
+ }
+
+ @Override
+ public ByteString getBytes() throws IOException {
+ ByteString.Output out = ByteString.newOutput(size);
+ try (GZIPInputStream gzipIn = new GZIPInputStream(compressedData.newInput())) {
+ ByteStreams.copy(gzipIn, out);
+ }
+ return out.toByteString();
+ }
+
+ @Override
+ public int size() {
+ return size;
+ }
+
+ @Override
+ public void writeTo(OutputStream out) throws IOException {
+ try (GZIPInputStream gzipIn = new GZIPInputStream(compressedData.newInput())) {
+ ByteStreams.copy(gzipIn, out);
+ }
+ }
+
+ @Override
+ public void fingerprint(Fingerprint fingerprint) {
+ try (GZIPInputStream gzipIn = new GZIPInputStream(compressedData.newInput())) {
+ byte[] chunk = new byte[4092];
+ int bytesRead;
+ while ((bytesRead = gzipIn.read(chunk)) > 0) {
+ fingerprint.addBytes(chunk, 0, bytesRead);
+ }
+ } catch (IOException e) {
+ // Unexpected, everything should be in memory!
+ throw new IllegalStateException("Unexpected IOException", e);
+ }
+ }
+ }
+}
diff --git a/src/test/java/com/google/devtools/build/lib/rules/genquery/GenQueryOutputStreamTest.java b/src/test/java/com/google/devtools/build/lib/rules/genquery/GenQueryOutputStreamTest.java
new file mode 100644
index 0000000..e69d5393
--- /dev/null
+++ b/src/test/java/com/google/devtools/build/lib/rules/genquery/GenQueryOutputStreamTest.java
@@ -0,0 +1,141 @@
+// Copyright 2019 The Bazel Authors. All rights reserved.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.google.devtools.build.lib.rules.genquery;
+
+import static com.google.common.truth.Truth.assertThat;
+
+import com.google.common.base.Strings;
+import com.google.devtools.build.lib.rules.genquery.GenQueryOutputStream.GenQueryResult;
+import com.google.devtools.build.lib.util.Fingerprint;
+import com.google.protobuf.ByteString;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+/** Tests for {@link GenQueryOutputStream}. */
+@RunWith(JUnit4.class)
+public class GenQueryOutputStreamTest {
+
+ @Test
+ public void testSmallOutputMultibyteWriteWithCompressionEnabled() throws IOException {
+ runMultibyteWriteTest(
+ Strings.repeat("xyz", 10_000),
+ /*compressionEnabled=*/ true,
+ GenQueryOutputStream.RegularResult.class);
+ }
+
+ @Test
+ public void testSmallOutputMultibyteWriteWithCompressionDisabled() throws IOException {
+ runMultibyteWriteTest(
+ Strings.repeat("xyz", 10_000),
+ /*compressionEnabled=*/ false,
+ GenQueryOutputStream.RegularResult.class);
+ }
+
+ @Test
+ public void testBigOutputMultibyteWriteWithCompressionEnabled() throws IOException {
+ runMultibyteWriteTest(
+ Strings.repeat("xyz", 1_000_000),
+ /*compressionEnabled=*/ true,
+ GenQueryOutputStream.CompressedResult.class);
+ }
+
+ @Test
+ public void testBigOutputMultibyteWriteWithCompressionDisabled() throws IOException {
+ runMultibyteWriteTest(
+ Strings.repeat("xyz", 1_000_000),
+ /*compressionEnabled=*/ false,
+ GenQueryOutputStream.RegularResult.class);
+ }
+
+ @Test
+ public void testSmallOutputSingleByteWritesWithCompressionEnabled() throws IOException {
+ runSingleByteWriteTest(
+ Strings.repeat("xyz", 10_000),
+ /*compressionEnabled=*/ true,
+ GenQueryOutputStream.RegularResult.class);
+ }
+
+ @Test
+ public void testSmallOutputSingleByteWritesWithCompressionDisabled() throws IOException {
+ runSingleByteWriteTest(
+ Strings.repeat("xyz", 10_000),
+ /*compressionEnabled=*/ false,
+ GenQueryOutputStream.RegularResult.class);
+ }
+
+ @Test
+ public void testBigOutputSingleByteWritesWithCompressionEnabled() throws IOException {
+ runSingleByteWriteTest(
+ Strings.repeat("xyz", 1_000_000),
+ /*compressionEnabled=*/ true,
+ GenQueryOutputStream.CompressedResult.class);
+ }
+
+ @Test
+ public void testBigOutputSingleByteWritesWithCompressionDisabled() throws IOException {
+ runSingleByteWriteTest(
+ Strings.repeat("xyz", 1_000_000),
+ /*compressionEnabled=*/ false,
+ GenQueryOutputStream.RegularResult.class);
+ }
+
+ private static void runMultibyteWriteTest(
+ String data, boolean compressionEnabled, Class<? extends GenQueryResult> resultClass)
+ throws IOException {
+ GenQueryOutputStream underTest = new GenQueryOutputStream(compressionEnabled);
+ underTest.write(data.getBytes(StandardCharsets.UTF_8));
+ underTest.close();
+
+ GenQueryOutputStream.GenQueryResult result = underTest.getResult();
+ assertThat(result).isInstanceOf(resultClass);
+ assertThat(result.getBytes()).isEqualTo(ByteString.copyFromUtf8(data));
+ assertThat(result.size()).isEqualTo(data.length());
+
+ Fingerprint fingerprint = new Fingerprint();
+ result.fingerprint(fingerprint);
+ assertThat(fingerprint.hexDigestAndReset()).isEqualTo(Fingerprint.getHexDigest(data));
+
+ ByteArrayOutputStream bytesOut = new ByteArrayOutputStream();
+ result.writeTo(bytesOut);
+ assertThat(new String(bytesOut.toByteArray(), StandardCharsets.UTF_8)).isEqualTo(data);
+ }
+
+ private static void runSingleByteWriteTest(
+ String data, boolean compressionEnabled, Class<? extends GenQueryResult> resultClass)
+ throws IOException {
+ GenQueryOutputStream underTest = new GenQueryOutputStream(compressionEnabled);
+ for (byte b : data.getBytes(StandardCharsets.UTF_8)) {
+ underTest.write(b);
+ }
+ underTest.close();
+
+ GenQueryOutputStream.GenQueryResult result = underTest.getResult();
+ assertThat(result).isInstanceOf(resultClass);
+ assertThat(result.getBytes()).isEqualTo(ByteString.copyFromUtf8(data));
+ assertThat(result.size()).isEqualTo(data.length());
+
+ Fingerprint fingerprint = new Fingerprint();
+ result.fingerprint(fingerprint);
+ assertThat(fingerprint.hexDigestAndReset()).isEqualTo(Fingerprint.getHexDigest(data));
+
+ ByteArrayOutputStream bytesOut = new ByteArrayOutputStream();
+ result.writeTo(bytesOut);
+ assertThat(new String(bytesOut.toByteArray(), StandardCharsets.UTF_8)).isEqualTo(data);
+ }
+}