Make a best effort attempt to avoid action cache journal corruption in the event of an abrupt exit.

Do this by implementing a special buffered output stream that only flushes at record boundaries. While this doesn't completely eliminate the possibility of corruption (that would require a different design), it makes it much less likely.

PiperOrigin-RevId: 742876110
Change-Id: Ia713e2e13555ce17986d7a8447b9f4a24964b2a1
diff --git a/src/main/java/com/google/devtools/build/lib/util/BUILD b/src/main/java/com/google/devtools/build/lib/util/BUILD
index dc15e1a..10c0441 100644
--- a/src/main/java/com/google/devtools/build/lib/util/BUILD
+++ b/src/main/java/com/google/devtools/build/lib/util/BUILD
@@ -255,6 +255,7 @@
         ":string_encoding",
         "//src/main/java/com/google/devtools/build/lib/bugreport",
         "//src/main/java/com/google/devtools/build/lib/concurrent",
+        "//src/main/java/com/google/devtools/build/lib/util/io",
         "//src/main/java/com/google/devtools/build/lib/vfs",
         "//src/main/java/com/google/devtools/build/lib/vfs:pathfragment",
         "//src/main/java/com/google/devtools/common/options",
diff --git a/src/main/java/com/google/devtools/build/lib/util/MapCodec.java b/src/main/java/com/google/devtools/build/lib/util/MapCodec.java
index 35f4c23..78b6237 100644
--- a/src/main/java/com/google/devtools/build/lib/util/MapCodec.java
+++ b/src/main/java/com/google/devtools/build/lib/util/MapCodec.java
@@ -13,6 +13,7 @@
 // limitations under the License.
 package com.google.devtools.build.lib.util;
 
+import com.google.devtools.build.lib.util.io.RecordOutputStream;
 import com.google.devtools.build.lib.vfs.Path;
 import java.io.BufferedOutputStream;
 import java.io.ByteArrayInputStream;
@@ -22,6 +23,7 @@
 import java.io.DataOutputStream;
 import java.io.IOException;
 import java.io.InputStream;
+import java.io.OutputStream;
 import javax.annotation.Nullable;
 
 /** Converts map entries between their in-memory and on-disk representations. */
@@ -143,35 +145,44 @@
    * @throws IOException if an I/O error occurs
    */
   public Writer createWriter(Path path, int version, boolean overwrite) throws IOException {
-    DataOutputStream out;
-    if (!overwrite && path.exists()) {
-      out =
-          new DataOutputStream(new BufferedOutputStream(path.getOutputStream(/* append= */ true)));
-    } else {
-      out = new DataOutputStream(new BufferedOutputStream(path.getOutputStream()));
-      out.writeLong(MAGIC);
-      out.writeLong(version);
+    boolean append = !overwrite && path.exists();
+    RecordOutputStream recordOut = new RecordOutputStream(path.getOutputStream(append));
+    DataOutputStream dataOut = new DataOutputStream(recordOut);
+    if (!append) {
+      dataOut.writeLong(MAGIC);
+      dataOut.writeLong(version);
+      recordOut.finishRecord();
     }
-    return new Writer(out);
+    return new Writer(recordOut, dataOut);
   }
 
-  /** Writes key/value pairs to a {@link DataOutputStream}. */
+  /**
+   * Writes key/value pairs to a {@link DataOutputStream} backed by a {@link RecordOutputStream}.
+   *
+   * <p>In a best-effort attempt to prevent data corruption in the event of an abrupt exit, use a
+   * {@link RecordOutputStream} instead of a {@link BufferedOutputStream} to ensure that only
+   * complete records are ever written to the underlying unbuffered {@link OutputStream}. While this
+   * can still be defeated by partial writes, experiments suggest they're rather unlikely for small
+   * buffer sizes.
+   */
   public final class Writer implements AutoCloseable {
-    private final DataOutputStream out;
+    private final RecordOutputStream recordOut;
+    private final DataOutputStream dataOut;
 
-    private Writer(DataOutputStream out) {
-      this.out = out;
+    private Writer(RecordOutputStream recordOut, DataOutputStream dataOut) {
+      this.recordOut = recordOut;
+      this.dataOut = dataOut;
     }
 
     /** Flushes the writer, forcing any pending writes to be written to disk. */
     public void flush() throws IOException {
-      out.flush();
+      recordOut.flush();
     }
 
     /** Closes the writer, releasing associated resources and rendering it unusable. */
     @Override
     public void close() throws IOException {
-      out.close();
+      recordOut.close();
     }
 
     /**
@@ -181,13 +192,14 @@
      * @param value the value to write, or null to write a tombstone.
      */
     public void writeEntry(K key, @Nullable V value) throws IOException {
-      out.writeByte(ENTRY_MAGIC);
-      writeKey(key, out);
+      dataOut.writeByte(ENTRY_MAGIC);
+      writeKey(key, dataOut);
       boolean hasValue = value != null;
-      out.writeBoolean(hasValue);
+      dataOut.writeBoolean(hasValue);
       if (hasValue) {
-        writeValue(value, out);
+        writeValue(value, dataOut);
       }
+      recordOut.finishRecord();
     }
   }
 }
diff --git a/src/main/java/com/google/devtools/build/lib/util/io/RecordOutputStream.java b/src/main/java/com/google/devtools/build/lib/util/io/RecordOutputStream.java
new file mode 100644
index 0000000..0c2379c
--- /dev/null
+++ b/src/main/java/com/google/devtools/build/lib/util/io/RecordOutputStream.java
@@ -0,0 +1,90 @@
+// Copyright 2025 The Bazel Authors. All rights reserved.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//    http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+package com.google.devtools.build.lib.util.io;
+
+import static com.google.common.math.IntMath.ceilingPowerOfTwo;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.util.Arrays;
+
+/**
+ * A buffered output stream that only flushes its buffer at record boundaries.
+ *
+ * <p>The {@link #finishRecord} method marks the current position as the end of a complete record.
+ * Whenever a flush occurs (either explicitly via {@link #flush} or implicitly via {@link #write} or
+ * {@link #close}), the internal buffer is only flushed up to the last recorded position, with any
+ * following bytes remaining in the internal buffer. The internal buffer starts at 4KB but grows to
+ * accommodate the largest record seen so far.
+ *
+ * <p>This is intended as a best-effort attempt to prevent incomplete records from being written to
+ * disk in the event of an abrupt exit. It isn't completely safe since partial underlying writes are
+ * still possible, but experiments suggest that they're very unlikely for small buffer sizes.
+ */
+public final class RecordOutputStream extends OutputStream {
+  private final OutputStream out;
+  private byte[] buf = new byte[4096];
+  private int writeOff = 0;
+  private int flushOff = 0;
+
+  public RecordOutputStream(OutputStream out) {
+    this.out = out;
+  }
+
+  /** Marks the current position as the end of a complete record. */
+  public void finishRecord() {
+    flushOff = writeOff;
+  }
+
+  @Override
+  public void write(int b) throws IOException {
+    write(new byte[] {(byte) b}, 0, 1);
+  }
+
+  @Override
+  public void write(byte[] b) throws IOException {
+    write(b, 0, b.length);
+  }
+
+  @Override
+  public void write(byte[] b, int off, int len) throws IOException {
+    if (len > buf.length - writeOff) {
+      // First try to make space by flushing.
+      flush();
+      if (len > buf.length - writeOff) {
+        // If the buffer is too small to fit a single record, grow it to the next power of two.
+        buf = Arrays.copyOf(buf, ceilingPowerOfTwo(writeOff + len));
+      }
+    }
+    System.arraycopy(b, off, buf, writeOff, len);
+    writeOff += len;
+  }
+
+  @Override
+  public void flush() throws IOException {
+    if (flushOff > 0) {
+      out.write(buf, 0, flushOff);
+      // TODO(tjgq): Consider using a ring buffer to avoid this copy.
+      System.arraycopy(buf, flushOff, buf, 0, writeOff - flushOff);
+      writeOff -= flushOff;
+      flushOff = 0;
+    }
+  }
+
+  @Override
+  public void close() throws IOException {
+    flush();
+    out.close();
+  }
+}
diff --git a/src/test/java/com/google/devtools/build/lib/util/io/RecordOutputStreamTest.java b/src/test/java/com/google/devtools/build/lib/util/io/RecordOutputStreamTest.java
new file mode 100644
index 0000000..4985979
--- /dev/null
+++ b/src/test/java/com/google/devtools/build/lib/util/io/RecordOutputStreamTest.java
@@ -0,0 +1,112 @@
+// Copyright 2025 The Bazel Authors. All rights reserved.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//    http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+package com.google.devtools.build.lib.util.io;
+
+import static com.google.common.truth.Truth.assertThat;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+/** Tests for {@link RecordOutputStream}. */
+@RunWith(JUnit4.class)
+public final class RecordOutputStreamTest {
+  @Test
+  public void empty() throws IOException {
+    ByteArrayOutputStream baos = new ByteArrayOutputStream();
+    try (RecordOutputStream recordOut = new RecordOutputStream(baos)) {}
+    assertThat(baos.toByteArray()).isEmpty();
+  }
+
+  @Test
+  public void write_singleRecord() throws IOException {
+    ByteArrayOutputStream baos = new ByteArrayOutputStream();
+    try (RecordOutputStream recordOut = new RecordOutputStream(baos)) {
+      recordOut.write(new byte[] {0x12, 0x34, 0x56});
+      recordOut.finishRecord();
+    }
+    assertThat(baos.toByteArray()).isEqualTo(new byte[] {0x12, 0x34, 0x56});
+  }
+
+  @Test
+  public void write_multipleRecords() throws IOException {
+    ByteArrayOutputStream baos = new ByteArrayOutputStream();
+    try (RecordOutputStream recordOut = new RecordOutputStream(baos)) {
+      recordOut.write(new byte[] {0x12, 0x34, 0x56});
+      recordOut.finishRecord();
+      recordOut.write(new byte[] {0x21, 0x43, 0x65});
+      recordOut.finishRecord();
+    }
+    assertThat(baos.toByteArray()).isEqualTo(new byte[] {0x12, 0x34, 0x56, 0x21, 0x43, 0x65});
+  }
+
+  @Test
+  public void write_largeRecord_singleWrite() throws IOException {
+    byte[] record = new byte[65536];
+    for (int i = 0; i < record.length; i++) {
+      record[i] = (byte) i;
+    }
+    ByteArrayOutputStream baos = new ByteArrayOutputStream();
+    try (RecordOutputStream recordOut = new RecordOutputStream(baos)) {
+      recordOut.write(record);
+      recordOut.finishRecord();
+    }
+    assertThat(baos.toByteArray()).isEqualTo(record);
+  }
+
+  @Test
+  public void write_largeRecord_multipleWrites() throws IOException {
+    byte[] record = new byte[65536];
+    for (int i = 0; i < record.length; i++) {
+      record[i] = (byte) i;
+    }
+    ByteArrayOutputStream baos = new ByteArrayOutputStream();
+    try (RecordOutputStream recordOut = new RecordOutputStream(baos)) {
+      for (int i = 0; i < record.length; i++) {
+        recordOut.write(record[i]);
+      }
+      recordOut.finishRecord();
+    }
+    assertThat(baos.toByteArray()).isEqualTo(record);
+  }
+
+  @Test
+  public void flush_onlyCompleteRecords() throws IOException {
+    ByteArrayOutputStream baos = new ByteArrayOutputStream();
+    try (RecordOutputStream recordOut = new RecordOutputStream(baos)) {
+      recordOut.write(new byte[] {0x12, 0x34});
+      recordOut.finishRecord();
+      recordOut.write(new byte[] {0x56, 0x78});
+      recordOut.flush();
+      assertThat(baos.toByteArray()).isEqualTo(new byte[] {0x12, 0x34});
+      recordOut.write(new byte[] {0x21, 0x43});
+      recordOut.finishRecord();
+      recordOut.flush();
+      assertThat(baos.toByteArray()).isEqualTo(new byte[] {0x12, 0x34, 0x56, 0x78, 0x21, 0x43});
+    }
+  }
+
+  @Test
+  public void close_onlyCompleteRecords() throws IOException {
+    ByteArrayOutputStream baos = new ByteArrayOutputStream();
+    try (RecordOutputStream recordOut = new RecordOutputStream(baos)) {
+      recordOut.write(new byte[] {0x21, 0x34});
+      recordOut.finishRecord();
+      recordOut.write(new byte[] {0x56, 0x78});
+    }
+    assertThat(baos.toByteArray()).isEqualTo(new byte[] {0x21, 0x34});
+  }
+}