blob: b424b1bb541ce6fab4f7a1d9b3d47a38baf19c1f [file] [log] [blame]
// Copyright 2014 The Bazel Authors. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package com.google.devtools.build.lib.util;
import com.google.common.collect.ForwardingConcurrentMap;
import com.google.common.collect.Sets;
import com.google.devtools.build.lib.concurrent.ThreadSafety.ThreadSafe;
import com.google.devtools.build.lib.util.MapCodec.IncompatibleFormatException;
import com.google.devtools.build.lib.vfs.FileSystemUtils;
import com.google.devtools.build.lib.vfs.Path;
import java.io.IOException;
import java.util.HashMap;
import java.util.Hashtable;
import java.util.LinkedHashSet;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.LinkedBlockingQueue;
import javax.annotation.Nullable;
import javax.annotation.concurrent.GuardedBy;
/**
* A map that is backed by persistent storage. It uses two files on disk for this: The first file
* contains all the entries and gets written when invoking the {@link #save()} method. The second
* file contains a journal of all entries that were added to or removed from the map since
* constructing the instance of the map or the last invocation of {@link #save()} and gets written
* after each update of the map although sub-classes are free to implement their own journal update
* strategy.
*
* <p><b>Ceci n'est pas un Map</b>. Strictly speaking, the {@link Map} interface doesn't permit the
* possibility of failure. This class uses persistence; persistence means I/O, and I/O means the
* possibility of failure. Therefore the semantics of this may deviate from the Map contract in
* failure cases. In particular, updates are not guaranteed to succeed. However, I/O failures are
* guaranteed to be reported upon the subsequent call to a method that throws {@code IOException}
* such as {@link #save}.
*
* <p>To populate the map entries using the previously persisted entries call {@link #load()} prior
* to invoking any other map operation.
*
* <p>Like {@link Hashtable} but unlike {@link HashMap}, this class does <em>not</em> allow
* <tt>null</tt> to be used as a key or a value.
*
* <p>IO failures during reading or writing the map entries to disk may result in {@link
* AssertionError} getting thrown from the failing method.
*
* <p>The constructor allows passing in a version number that gets written to the files on disk and
* checked before reading from disk. Files with an incompatible version number will be ignored. This
* allows the client code to change the persistence format without polluting the file system name
* space.
*/
public abstract class PersistentMap<K, V> extends ForwardingConcurrentMap<K, V> {
private final int version;
@GuardedBy("this")
private final Path mapFile;
@GuardedBy("this")
private final Path journalFile;
private final LinkedBlockingQueue<K> journal;
@GuardedBy("this")
private MapCodec<K, V>.Writer journalOut;
/**
* If non-null, contains the message from an {@code IOException} thrown by a previously failed
* write. This error is deferred until the next call to a method which is able to throw an
* exception.
*/
@GuardedBy("this")
@Nullable
private String deferredIOFailure = null;
/**
* 'loaded' is true when the in-memory representation is at least as recent as the on-disk
* representation.
*/
private boolean loaded;
private final ConcurrentMap<K, V> delegate;
private final MapCodec<K, V> codec;
/**
* Creates a new PersistentMap instance using the specified backing map.
*
* @param version the version tag. Changing the version tag allows updating the on disk format.
* The map will never read from a file that was written using a different version tag.
* @param codec the codec used to convert between the in-memory and on-disk representations.
* @param map the backing map to use for this PersistentMap.
* @param mapFile the file to save the map entries to.
* @param journalFile the journal file to write entries between invocations of {@link #save()}.
*/
protected PersistentMap(
int version, MapCodec<K, V> codec, ConcurrentMap<K, V> map, Path mapFile, Path journalFile) {
this.version = version;
this.codec = codec;
journal = new LinkedBlockingQueue<>();
this.mapFile = mapFile;
this.journalFile = journalFile;
delegate = map;
}
@Override
protected final ConcurrentMap<K, V> delegate() {
return delegate;
}
@ThreadSafe
@Override
@Nullable
public V put(K key, V value) {
V previous = delegate.put(key, value);
journal.add(key);
maybeFlushJournal();
return previous;
}
@ThreadSafe
@Override
@Nullable
public V putIfAbsent(K key, V value) {
V previous = delegate.putIfAbsent(key, value);
if (previous == null) {
journal.add(key);
maybeFlushJournal();
}
return previous;
}
/**
* Potentially flushes the in-memory journal to disk, as determined by {@link
* #shouldFlushJournal()}.
*/
@ThreadSafe
private void maybeFlushJournal() {
if (shouldFlushJournal()) {
flushJournal();
}
}
/**
* Determines whether the in-memory journal should be flushed to disk.
*
* <p>Called whenever an update is appended to the in-memory journal. The default is to flush it
* immediately, but subclasses are free to override this to implement their own strategy.
*/
protected boolean shouldFlushJournal() {
return true;
}
@ThreadSafe
@Override
@SuppressWarnings("unchecked")
@Nullable
public V remove(Object object) {
V previous = delegate.remove(object);
if (previous != null) {
// we know that 'object' must be an instance of K, because the
// remove call succeeded, i.e. 'object' was mapped to 'previous'.
journal.add((K) object); // unchecked
maybeFlushJournal();
}
return previous;
}
@Override
public V replace(K key, V value) {
throw new UnsupportedOperationException();
}
@Override
public boolean replace(K key, V oldValue, V newValue) {
throw new UnsupportedOperationException();
}
/** Flushes the in-memory journal to disk. */
public synchronized void flushJournal() {
try {
if (journalOut == null) {
// Append to a preexisting journal file, which may have been left around after the last
// save() because shouldKeepJournal() was true.
journalOut = codec.createWriter(journalFile, version, /* overwrite= */ false);
}
// Journal may have duplicates, we can ignore them.
LinkedHashSet<K> keys = Sets.newLinkedHashSetWithExpectedSize(journal.size());
journal.drainTo(keys);
writeEntries(journalOut, keys);
journalOut.flush();
} catch (IOException e) {
this.deferredIOFailure = e.getMessage() + " during journal append";
}
}
/**
* Loads the previous written map entries from disk.
*
* <p>If no on-disk state exists, loading is successful and produces an empty map.
*
* <p>Data corruption is handled differently for each file:
*
* <ul>
* <li>Corruption in the map file is treated as an error, as the file is updated atomically.
* <li>Corruption in the journal file is tolerated by ignoring the remaining contents, as the
* file is updated non-atomically.
*
* @throws IncompatibleFormatException if the on-disk data is in an incompatible format
* @throws IOException if data corruption is detected and cannot be recovered from, or some other
* I/O error occurs
*/
public synchronized void load() throws IOException {
if (!loaded) {
if (mapFile.exists()) {
loadEntries(mapFile);
}
if (journalFile.exists()) {
try {
loadEntries(journalFile);
} catch (IOException e) {
if (e instanceof IncompatibleFormatException) {
throw e;
}
}
// Merge the journal into the map file and delete the former, ensuring that we don't keep
// appending to a corrupted journal.
// TODO(tjgq): Avoid doing this unless journal corruption was detected.
save(/* fullSave= */ true);
}
loaded = true;
}
}
@Override
public synchronized void clear() {
super.clear();
try {
// We must do a full save because we're bypassing the journal.
save(/* fullSave= */ true);
} catch (IOException e) {
this.deferredIOFailure = e.getMessage() + " during map clear";
}
}
/**
* Saves the map to disk.
*
* @throws IOException if there was an I/O error during this call, or any previous call since the
* last save().
*/
public synchronized long save() throws IOException {
return save(false);
}
/**
* Saves the map to disk.
*
* @param fullSave if true, the journal file will be merged into the map file and deleted;
* otherwise, the decision is made by {@link #shouldKeepJournal()}.
* @throws IOException if there was an I/O error during this call, or any previous call since the
* last save().
*/
private synchronized long save(boolean fullSave) throws IOException {
/* Report a previously failing I/O operation. */
if (deferredIOFailure != null) {
try {
throw new IOException(deferredIOFailure);
} finally {
deferredIOFailure = null;
}
}
if (!fullSave && shouldKeepJournal()) {
flushJournal();
journalOut.close();
journalOut = null;
} else {
Path mapTemp =
mapFile.getRelative(FileSystemUtils.replaceExtension(mapFile.asFragment(), ".tmp"));
try {
saveEntries(mapTemp);
mapFile.delete();
mapTemp.renameTo(mapFile);
} finally {
mapTemp.delete();
}
clearJournal();
journalFile.delete();
}
return journalSize() + cacheSize();
}
protected final synchronized long journalSize() throws IOException {
return journalFile.exists() ? journalFile.getFileSize() : 0;
}
protected final synchronized long cacheSize() throws IOException {
return mapFile.exists() ? mapFile.getFileSize() : 0;
}
/**
* Whether to keep the journal file on save.
*
* <p>The default is to always merge the journal file into the main file and delete the former,
* but subclasses are free to override this to implement their own strategy.
*/
protected boolean shouldKeepJournal() {
return false;
}
private synchronized void clearJournal() throws IOException {
journal.clear();
if (journalOut != null) {
journalOut.close();
journalOut = null;
}
}
/**
* Loads all entries from the given file into the backing map.
*
* @throws IncompatibleFormatException if the file is in an incompatible format
* @throws IOException if the file is corrupted or an I/O error occurs
*/
private synchronized void loadEntries(Path mapFile) throws IOException {
try (MapCodec<K, V>.Reader in = codec.createReader(mapFile, version)) {
readEntries(in);
}
}
/** Saves all backing map entries to the given file, overwriting preexisting contents. */
private synchronized void saveEntries(Path mapFile) throws IOException {
try (MapCodec<K, V>.Writer out = codec.createWriter(mapFile, version, /* overwrite= */ true)) {
writeEntries(out, null);
}
}
/**
* Writes backing map entries for a set of keys into a {@link MapCodec.Writer}.
*
* @param out the {@link MapCodec.Writer} to write to.
* @param keys the keys that are to be written, or null to write all keys.
* @throws IOException
*/
private void writeEntries(MapCodec<K, V>.Writer out, @Nullable Set<K> keys) throws IOException {
Map<K, V> map = delegate();
for (K key : keys != null ? keys : map.keySet()) {
out.writeEntry(key, map.get(key));
}
}
/**
* Reads entries from a {@link MapCodec.Reader} into the backing map.
*
* @param in the {@link MapCodec.Reader} to read from.
*/
private void readEntries(MapCodec<K, V>.Reader in) throws IOException {
Map<K, V> map = delegate();
MapCodec.Entry<K, V> entry;
while ((entry = in.readEntry()) != null) {
K key = entry.key();
V value = entry.value();
if (value != null) {
map.put(key, value);
} else {
map.remove(key);
}
}
}
}