blob: b067f483388723b0c21bda4fa46545223979df29 [file] [log] [blame]
// Copyright 2023 The Bazel Authors. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package com.google.devtools.build.lib.concurrent;
import static com.google.common.base.MoreObjects.toStringHelper;
import static com.google.devtools.build.lib.concurrent.PriorityWorkerPool.TASKS_MAX_VALUE;
import com.google.common.annotations.VisibleForTesting;
import com.google.devtools.build.lib.unsafe.UnsafeProvider;
import java.util.Arrays;
import java.util.concurrent.ConcurrentLinkedQueue;
import sun.misc.Unsafe;
/**
* A fixed-capacity concurrent FIFO for tasks used by {@link PriorityWorkerPool}.
*
* <p>This class is a higher performance, nearly garbage-free, but less flexible substitute for
* {@link ConcurrentLinkedQueue},
*
* <ul>
* <li>The queue capacity is fixed.
* <li>The client must guarantee not to take more than it has added.
* <li>The client must have an fallback if {@link TaskFifo#tryAppend} fails.
* </ul>
*
* <p>This class is inspired by Morrison, Adam, and Yehuda Afek. "Fast concurrent queues for x86
* processors." Proceedings of the 18th ACM SIGPLAN symposium on Principles and practice of parallel
* programming. 2013.
*/
final class TaskFifo {
private static final Integer SKIP_SLOW_APPENDER = 1;
/**
* The power of 2 backing array capacity.
*
* <p>This is one more than the number of elements the queue can contain at one time. This helps
* the efficiency of bits used to represent the number of elements enqueued. For example, the
* number of bits needed to represent the element count for a queue of size 256 is 9, but only 8
* bits are needed for a queue of size 255.
*/
@VisibleForTesting static final int CAPACITY = TASKS_MAX_VALUE + 1;
/** AND with this mask performs modulo {@link #CAPACITY}. */
private static final int CAPACITY_MASK = CAPACITY - 1;
/**
* Circular buffer containing tasks and skip metadata.
*
* <p>The algorithm assigns to each caller of {@link #tryAppend} or {@link #take} a monotonically
* increasing index (not including {@link #tryAppend} calls that would exceed capacity). {@link
* #take} cannot be called more times than successful {@link tryAppend} calls by client
* restriction. Thus each taker is assigned to a previous appender by matching index.
*
* <p>The naive algorithm based on the above would not be lock-free due to slow or descheduled
* threads. For example, consider a taker assigned to an index where the corresponding appender's
* thread has been descheduled before writing its task to the queue. When the taker observes its
* assigned queue position, it does not see a value. The converse scenario is also possible. An
* appender could see an occupied queue position while expecting {@code null} when a taker on the
* same cyclic offset from a previous epoch is slow.
*
* <p>To resolve these situations, the threads that encounter them actively place a skip marker
* into the queue that needs to be consumed by its counterpart. A taker that observes a null value
* places a {@link Integer} {@code 1} to mark it, then skips to the next index. On seeing the
* marker, the appender decrements it (with {@code 1} transitioning back to {@code null}). The
* taker, when skipping to the next index, can expect to find a value there because an incomplete
* append does not count as a successful one so there should be an extra complete append at a
* subsequent index. Likewise, appenders that have looped all the way around to the same offset,
* should expect to find an empty queue position due to capacity constraints.
*
* <p>Appenders that observe a value when expecting an empty position wrap the value with {@link
* TaskWithSkippedAppends} then skip to the next available index. Slow takers decrement the counts
* on the wrappers then skip to the next available index.
*
* <p>The skip marker has a count because the number of threads that could potentially be
* descheduled at a particular index is only limited by the queue capacity, though more than one
* should be extremely rare.
*
* <p>Each queue position contains one of the following.
*
* <ul>
* <li>{@code null} is an empty position.
* <li>{@link Runnable} is a position containing a task.
* <li>{@link Integer} is a count of takers that skipped the position because they observed a
* null value. The count corresponds to slow appenders at the position.
* <li>{@link TaskWithSkippedAppends} is a task with a count of appenders that skipped the
* position due to it being still occupied with a task. The count corresponds to slow takers
* assigned to the position.
* </ul>
*
* <p>Correctness of the algorithm is straightforward. Anytime a taker skips a position, it adds a
* count so that the same number of appenders skip that position and vice versa. Therefore the
* number of takers and appenders skipping any given position stays balanced so the take and
* append indices stay synchronized.
*/
private final Object[] queue = new Object[CAPACITY];
/**
* Address of index for appending; incremented by appending.
*
* <p>The actual array offset is the value modulo {@link #CAPACITY}.
*/
private final long appendIndexAddress;
/**
* Address of index for taking; incremented by taking.
*
* <p>The actual array offset is the value modulo {@link #CAPACITY}.
*/
private final long takeIndexAddress;
/**
* The queue contains no more than this many tasks.
*
* <p>This is incremented before appending and decremented after taking.
*/
private final long sizeAddress;
/**
* Constructor.
*
* <p>The caller owns the memory associated with the provided addresses.
*
* @param sizeAddress padded location of the {@code int} size of this queue.
* @param takeIndexAddress padded location of the {@code int} take index.
* @param appendIndexAddress padded location of the {@code int} append index.
*/
TaskFifo(long sizeAddress, long appendIndexAddress, long takeIndexAddress) {
this.sizeAddress = sizeAddress;
this.appendIndexAddress = appendIndexAddress;
this.takeIndexAddress = takeIndexAddress;
// Explicitly initializes the provided addresses.
UNSAFE.putInt(null, sizeAddress, 0);
UNSAFE.putInt(null, appendIndexAddress, 0);
UNSAFE.putInt(null, takeIndexAddress, 0);
}
/**
* Tries to insert a task into the queue.
*
* @return true if successful, false if it would have exceeded the capacity.
*/
boolean tryAppend(Runnable task) {
// Optimistically increases size, and rolls back if it exceeds capacity.
if (UNSAFE.getAndAddInt(null, sizeAddress, 1) >= TASKS_MAX_VALUE) {
UNSAFE.getAndAddInt(null, sizeAddress, -1);
return false;
}
do {
int offset = getQueueOffset(UNSAFE.getAndAddInt(null, appendIndexAddress, 1));
// In the common case, we can avoid an extra read.
if (UNSAFE.compareAndSwapObject(queue, offset, null, task)) {
return true;
}
do {
// A plain read is sufficient here because this is always preceded by a failed CAS of the
// same memory location.
Object snapshot = UNSAFE.getObject(queue, offset);
// It's possible that the taker outraced the snapshot above.
if (snapshot == null) {
if (UNSAFE.compareAndSwapObject(queue, offset, null, task)) {
return true;
}
continue; // Refreshes the snapshot.
}
// There's some slowness that has to be resolved.
Object target;
if (snapshot instanceof Integer) {
// There were previous takes without corresponding appends. Acknowledges a taker that
// skipped this offset.
int newCount = ((Integer) snapshot) - 1;
target = newCount == 0 ? null : newCount;
} else if (snapshot instanceof Runnable) {
// A taker was slow.
target = new TaskWithSkippedAppends((Runnable) snapshot, /* skippedAppendCount= */ 1);
} else {
// Multiple takers are slow. This should be very rare. Increments the skip count.
target = ((TaskWithSkippedAppends) snapshot).incrementSkips();
}
if (UNSAFE.compareAndSwapObject(queue, offset, snapshot, target)) {
break; // Success, skips to next.
} // Otherwise refreshes the snapshot.
} while (true);
} while (true);
}
/**
* Takes an available task.
*
* <p>This must not be called more times than {@link #tryAppend} has succeeded.
*/
Runnable take() {
do {
int offset = getQueueOffset(UNSAFE.getAndAddInt(null, takeIndexAddress, 1));
do {
// A plain read is sufficient here.
// 1. The initial read is supported by the client. In most cases, the client establishes the
// necessary happens-before relationship in honoring the constraint of no more takes than
// successful appends.
// 2. On subsequent reads, this immediately follows a failed CAS of the same memory
// location, which refreshes the memory.
Object snapshot = UNSAFE.getObject(queue, offset);
if (snapshot instanceof Runnable) {
// Attempts to take ownership of the task.
if (UNSAFE.compareAndSwapObject(queue, offset, snapshot, null)) {
UNSAFE.getAndAddInt(null, sizeAddress, -1);
return (Runnable) snapshot;
}
} else {
Object target;
if (snapshot == null) {
target = SKIP_SLOW_APPENDER;
} else if (snapshot instanceof Integer) {
// Increments the count due to multiple slow appenders, which should be very rare.
target = ((Integer) snapshot).intValue() + 1;
} else {
// There have been appends without corresponding takes. Acknowledges one skip.
target = ((TaskWithSkippedAppends) snapshot).decrementSkips();
}
if (UNSAFE.compareAndSwapObject(queue, offset, snapshot, target)) {
break; // Success, skips to next.
} // Otherwise refreshes the snapshot.
}
} while (true);
} while (true);
}
int size() {
return UNSAFE.getIntVolatile(null, sizeAddress);
}
void clear() {
UNSAFE.putInt(null, sizeAddress, 0);
UNSAFE.putInt(null, appendIndexAddress, 0);
UNSAFE.putInt(null, takeIndexAddress, 0);
Arrays.fill(queue, null);
}
@Override
public String toString() {
int appendIndex = UNSAFE.getIntVolatile(null, appendIndexAddress);
int takeIndex = UNSAFE.getIntVolatile(null, takeIndexAddress);
var helper =
toStringHelper(this)
.add("size", UNSAFE.getIntVolatile(null, sizeAddress))
.add("appendIndex", String.format("%d (%d)", appendIndex, appendIndex & CAPACITY_MASK))
.add("takeIndex", String.format("%d (%d)", takeIndex, takeIndex & CAPACITY_MASK));
StringBuilder buf = new StringBuilder("[");
for (int i = 0; i < CAPACITY; ++i) {
if (i > 0) {
buf.append(',');
}
if (i % 10 == 0) {
buf.append(i).append(':');
}
var elt = queue[i];
if (elt == null) {
buf.append('0');
} else if (elt instanceof Runnable) {
buf.append('1');
} else if (elt instanceof Integer) {
buf.append('S').append(elt);
} else {
buf.append('T').append(((TaskWithSkippedAppends) elt).skippedAppendCount);
}
}
helper.add("queue", buf.append(']').toString());
return helper.toString();
}
@VisibleForTesting
Object[] getQueueForTesting() {
return queue;
}
private static int getQueueOffset(int index) {
return TASKS_BASE + TASKS_SCALE * (index & CAPACITY_MASK);
}
@VisibleForTesting
static class TaskWithSkippedAppends {
private final Runnable task;
private final int skippedAppendCount;
private TaskWithSkippedAppends(Runnable task, int skippedAppendCount) {
this.task = task;
this.skippedAppendCount = skippedAppendCount;
}
private Object decrementSkips() {
if (skippedAppendCount <= 1) {
return task;
}
return new TaskWithSkippedAppends(task, skippedAppendCount - 1);
}
private TaskWithSkippedAppends incrementSkips() {
return new TaskWithSkippedAppends(task, skippedAppendCount + 1);
}
@VisibleForTesting
Runnable taskForTesting() {
return task;
}
@VisibleForTesting
int skippedAppendCountForTesting() {
return skippedAppendCount;
}
}
private static final Unsafe UNSAFE = UnsafeProvider.unsafe();
private static final int TASKS_BASE = Unsafe.ARRAY_OBJECT_BASE_OFFSET;
private static final int TASKS_SCALE = Unsafe.ARRAY_OBJECT_INDEX_SCALE;
}