blob: 96b8a5dd573b9839ecff80168ede2a4747b86088 [file] [log] [blame]
// Copyright 2014 The Bazel Authors. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package com.google.devtools.build.lib.actions;
import static com.google.devtools.build.lib.profiler.AutoProfiler.profiled;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableMap;
import com.google.devtools.build.lib.concurrent.ThreadSafety.ThreadSafe;
import com.google.devtools.build.lib.profiler.AutoProfiler;
import com.google.devtools.build.lib.profiler.ProfilerTask;
import com.google.devtools.build.lib.util.Pair;
import com.google.devtools.build.lib.worker.Worker;
import com.google.devtools.build.lib.worker.WorkerKey;
import com.google.devtools.build.lib.worker.WorkerPool;
import java.io.IOException;
import java.util.Deque;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.Map;
import java.util.NoSuchElementException;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import javax.annotation.Nullable;
/**
* Used to keep track of resources consumed by the Blaze action execution threads and throttle them
* when necessary.
*
* <p>Threads which are known to consume a significant amount of resources should call {@link
* #acquireResources} method. This method will check whether requested resources are available and
* will either mark them as used and allow the thread to proceed or will block the thread until
* requested resources will become available. When the thread completes its task, it must release
* allocated resources by calling {@link #releaseResources} method.
*
* <p>Available resources can be calculated using one of three ways:
*
* <ol>
* <li>They can be preset using {@link #setAvailableResources(ResourceSet)} method. This is used
* mainly by the unit tests (however it is possible to provide a future option that would
* artificially limit amount of CPU/RAM consumed by the Blaze).
* <li>They can be preset based on the /proc/cpuinfo and /proc/meminfo information. Blaze will
* calculate amount of available CPU cores (adjusting for hyperthreading logical cores) and
* amount of the total available memory and will limit itself to the number of effective cores
* and 2/3 of the available memory. For details, please look at the {@link
* LocalHostCapacity#getLocalHostCapacity} method.
* </ol>
*
* <p>The resource manager also allows a slight overallocation of the resources to account for the
* fact that requested resources are usually estimated using a pessimistic approximation. It also
* guarantees that at least one thread will always be able to acquire any amount of requested
* resources (even if it is greater than amount of available resources). Therefore, assuming that
* threads correctly release acquired resources, Blaze will never be fully blocked.
*/
@ThreadSafe
public class ResourceManager implements ResourceEstimator {
/**
* A handle returned by {@link #acquireResources(ActionExecutionMetadata, ResourceSet,
* ResourcePriority)} that must be closed in order to free the resources again.
*/
public static class ResourceHandle implements AutoCloseable {
private final ResourceManager rm;
private final ActionExecutionMetadata actionMetadata;
private final ResourceSet resourceSet;
private Worker worker;
private ResourceHandle(
ResourceManager rm,
ActionExecutionMetadata actionMetadata,
ResourceSet resources,
Worker worker) {
this.rm = rm;
this.actionMetadata = actionMetadata;
this.resourceSet = resources;
this.worker = worker;
}
@Nullable
public Worker getWorker() {
return worker;
}
/** Closing the ResourceHandle releases the resources associated with it. */
@Override
public void close() throws IOException, InterruptedException {
rm.releaseResources(actionMetadata, resourceSet, worker);
}
public void invalidateAndClose() throws IOException, InterruptedException {
rm.workerPool.invalidateObject(resourceSet.getWorkerKey(), worker);
worker = null;
this.close();
}
}
private final ThreadLocal<Boolean> threadLocked =
new ThreadLocal<Boolean>() {
@Override
protected Boolean initialValue() {
return false;
}
};
/**
* Defines the possible priorities of resources. The earlier elements in this enum will get first
* chance at grabbing resources.
*/
public enum ResourcePriority {
LOCAL(), // Local execution not under dynamic execution
DYNAMIC_WORKER(),
DYNAMIC_STANDALONE();
}
/** Singleton reference defined in a separate class to ensure thread-safe lazy initialization. */
private static class Singleton {
static ResourceManager instance = new ResourceManager();
}
/** Returns singleton instance of the resource manager. */
public static ResourceManager instance() {
return Singleton.instance;
}
/** Returns prediction of RAM in Mb used by registered actions. */
@Override
public double getUsedMemoryInMb() {
return usedResources.get(ResourceSet.MEMORY);
}
/** Returns prediction of CPUs used by registered actions. */
@Override
public double getUsedCPU() {
return usedResources.get(ResourceSet.CPU);
}
// Allocated resources are allowed to go "negative", but at least
// MIN_NECESSARY_RATIO portion of each resource should be available.
// Please note that this value is purely empirical - we assume that generally
// requested resources are somewhat pessimistic and thread would end up
// using less than requested amount.
private static final Double DEFAULT_MIN_NECESSARY_RATIO = 1.0;
private static final ImmutableMap<String, Double> MIN_NECESSARY_RATIO =
ImmutableMap.of(ResourceSet.CPU, 0.6);
// Lists of blocked threads. Associated CountDownLatch object will always
// be initialized to 1 during creation in the acquire() method.
// We use LinkedList because we will need to remove elements from the middle frequently in the
// middle of iterating through the list.
@SuppressWarnings("JdkObsolete")
private final Deque<Pair<ResourceSet, LatchWithWorker>> localRequests = new LinkedList<>();
@SuppressWarnings("JdkObsolete")
private final Deque<Pair<ResourceSet, LatchWithWorker>> dynamicWorkerRequests =
new LinkedList<>();
@SuppressWarnings("JdkObsolete")
private final Deque<Pair<ResourceSet, LatchWithWorker>> dynamicStandaloneRequests =
new LinkedList<>();
private WorkerPool workerPool;
// The total amount of available for Bazel resources on the local host. Must be set by
// an explicit call to setAvailableResources(), often using
// LocalHostCapacity.getLocalHostCapacity() as an argument.
@VisibleForTesting public ResourceSet availableResources = null;
// Used amount of resources. Corresponds to the resource
// definition in the ResourceSet class.
private Map<String, Double> usedResources;
// Used local test count. Corresponds to the local test count definition in the ResourceSet class.
private int usedLocalTestCount;
@VisibleForTesting
public static ResourceManager instanceForTestingOnly() {
return new ResourceManager();
}
/**
* Resets resource manager state and releases all thread locks.
*
* <p>Note - it does not reset available resources. Use separate call to setAvailableResources().
*/
public synchronized void resetResourceUsage() {
usedResources = new HashMap<>();
usedLocalTestCount = 0;
for (Pair<ResourceSet, LatchWithWorker> request : localRequests) {
request.second.latch.countDown();
}
for (Pair<ResourceSet, LatchWithWorker> request : dynamicWorkerRequests) {
request.second.latch.countDown();
}
for (Pair<ResourceSet, LatchWithWorker> request : dynamicStandaloneRequests) {
request.second.latch.countDown();
}
localRequests.clear();
dynamicWorkerRequests.clear();
dynamicStandaloneRequests.clear();
}
/**
* Sets available resources using given resource set.
*
* <p>Must be called at least once before using resource manager.
*/
public synchronized void setAvailableResources(ResourceSet resources) {
Preconditions.checkNotNull(resources);
resetResourceUsage();
availableResources = resources;
}
/** Sets worker pool for taking the workers. Must be called before requesting the workers. */
public void setWorkerPool(WorkerPool workerPool) {
this.workerPool = workerPool;
}
/**
* Acquires requested resource set. Will block if resource is not available. NB! This method must
* be thread-safe!
*/
public ResourceHandle acquireResources(
ActionExecutionMetadata owner, ResourceSet resources, ResourcePriority priority)
throws InterruptedException, IOException {
Preconditions.checkNotNull(
resources, "acquireResources called with resources == NULL during %s", owner);
Preconditions.checkState(
!threadHasResources(), "acquireResources with existing resource lock during %s", owner);
LatchWithWorker latchWithWorker = null;
AutoProfiler p =
profiled("Acquiring resources for: " + owner.describe(), ProfilerTask.ACTION_LOCK);
try {
latchWithWorker = acquire(resources, priority);
if (latchWithWorker.latch != null) {
latchWithWorker.latch.await();
}
} catch (InterruptedException e) {
// Synchronize on this to avoid any racing with #processWaitingThreads
synchronized (this) {
if (latchWithWorker != null) {
if (latchWithWorker.latch == null || latchWithWorker.latch.getCount() == 0) {
// Resources already acquired by other side. Release them, but not inside this
// synchronized block to avoid deadlock.
release(resources, latchWithWorker.worker);
} else {
// Inform other side that resources shouldn't be acquired.
latchWithWorker.latch.countDown();
}
}
}
throw e;
}
threadLocked.set(true);
CountDownLatch latch;
Worker worker;
synchronized (this) {
latch = latchWithWorker.latch;
worker = latchWithWorker.worker;
}
// Profile acquisition only if it waited for resource to become available.
if (latch != null) {
p.complete();
}
return new ResourceHandle(this, owner, resources, worker);
}
@Nullable
private Worker incrementResources(ResourceSet resources)
throws IOException, InterruptedException {
resources
.getResources()
.forEach(
(key, value) -> {
if (usedResources.containsKey(key)) {
value += usedResources.get(key);
}
usedResources.put(key, value);
});
usedLocalTestCount += resources.getLocalTestCount();
if (resources.getWorkerKey() != null) {
return this.workerPool.borrowObject(resources.getWorkerKey());
}
return null;
}
/** Return true if any resources have been claimed through this manager. */
public synchronized boolean inUse() {
return !usedResources.isEmpty()
|| usedLocalTestCount != 0
|| !localRequests.isEmpty()
|| !dynamicWorkerRequests.isEmpty()
|| !dynamicStandaloneRequests.isEmpty();
}
/** Return true iff this thread has a lock on non-zero resources. */
public boolean threadHasResources() {
return threadLocked.get();
}
/**
* Releases previously requested resource.
*
* <p>NB! This method must be thread-safe!
*
* @param owner action metadata, which resources should ve released
* @param resources resources should be released
* @param worker the worker, which used during execution
* @throws java.io.IOException if could not return worker to the workerPool
*/
void releaseResources(
ActionExecutionMetadata owner, ResourceSet resources, @Nullable Worker worker)
throws IOException, InterruptedException {
Preconditions.checkNotNull(
resources, "releaseResources called with resources == NULL during %s", owner);
Preconditions.checkState(
threadHasResources(), "releaseResources without resource lock during %s", owner);
boolean resourcesReused = false;
AutoProfiler p = profiled(owner.describe(), ProfilerTask.ACTION_RELEASE);
try {
resourcesReused = release(resources, worker);
} finally {
threadLocked.set(false);
// Profile resource release only if it resolved at least one allocation request.
if (resourcesReused) {
p.complete();
}
}
}
// TODO (b/241066751) find better way to change resource ownership
public void releaseResourceOwnership() {
threadLocked.set(false);
}
public void acquireResourceOwnership() {
threadLocked.set(true);
}
/**
* Returns the pair of worker and latch. Worker should be null if there is no workerKey in
* resources. The latch isn't null if we could not acquire the resources right now and need to
* wait.
*/
private synchronized LatchWithWorker acquire(ResourceSet resources, ResourcePriority priority)
throws IOException, InterruptedException, NoSuchElementException {
if (areResourcesAvailable(resources)) {
Worker worker = incrementResources(resources);
return new LatchWithWorker(/* latch= */ null, worker);
}
Pair<ResourceSet, LatchWithWorker> request =
new Pair<>(resources, new LatchWithWorker(new CountDownLatch(1), /* worker= */ null));
switch (priority) {
case LOCAL:
localRequests.addLast(request);
break;
case DYNAMIC_WORKER:
// Dynamic requests should be LIFO, because we are more likely to win the race on newer
// actions.
dynamicWorkerRequests.addFirst(request);
break;
case DYNAMIC_STANDALONE:
// Dynamic requests should be LIFO, because we are more likely to win the race on newer
// actions.
dynamicStandaloneRequests.addFirst(request);
break;
}
return request.second;
}
/**
* Release resources and process the queues of waiting threads. Return true when any new thread
* processed.
*/
private boolean release(ResourceSet resources, @Nullable Worker worker)
throws IOException, InterruptedException {
// We need to release the worker first to not block highPriorityWorkerMnemonics management. See
// more on b/244297036.
if (worker != null) {
this.workerPool.returnObject(worker.getWorkerKey(), worker);
}
releaseResourcesOnly(resources);
return processAllWaitingThreads();
}
private synchronized void releaseResourcesOnly(ResourceSet resources) {
usedLocalTestCount -= resources.getLocalTestCount();
// TODO(bazel-team): (2010) rounding error can accumulate and value below can end up being
// e.g. 1E-15. So if it is small enough, we set it to 0. But maybe there is a better solution.
double epsilon = 0.0001;
Set<String> toRemove = new HashSet<>();
for (Map.Entry<String, Double> resource : resources.getResources().entrySet()) {
String key = resource.getKey();
double value = usedResources.getOrDefault(key, 0.0) - resource.getValue();
usedResources.put(key, value);
if (value < epsilon) {
toRemove.add(key);
}
}
usedResources.keySet().removeAll(toRemove);
for (String key : toRemove) {
usedResources.remove(key);
}
}
private synchronized boolean processAllWaitingThreads() throws IOException, InterruptedException {
boolean anyProcessed = false;
if (!localRequests.isEmpty()) {
processWaitingThreads(localRequests);
anyProcessed = true;
}
if (!dynamicWorkerRequests.isEmpty()) {
processWaitingThreads(dynamicWorkerRequests);
anyProcessed = true;
}
if (!dynamicStandaloneRequests.isEmpty()) {
processWaitingThreads(dynamicStandaloneRequests);
anyProcessed = true;
}
return anyProcessed;
}
private synchronized void processWaitingThreads(Deque<Pair<ResourceSet, LatchWithWorker>> requests)
throws IOException, InterruptedException {
Iterator<Pair<ResourceSet, LatchWithWorker>> iterator = requests.iterator();
while (iterator.hasNext()) {
Pair<ResourceSet, LatchWithWorker> request = iterator.next();
if (request.second.latch.getCount() != 0) {
if (areResourcesAvailable(request.first)) {
Worker worker = incrementResources(request.first);
request.second.worker = worker;
request.second.latch.countDown();
iterator.remove();
}
} else {
// Cancelled by other side.
iterator.remove();
}
}
}
/** Throws an exception if requested extra resource isn't being tracked */
private void assertResourcesTracked(ResourceSet resources) throws NoSuchElementException {
for (Map.Entry<String, Double> resource : resources.getResources().entrySet()) {
String key = resource.getKey();
if (!availableResources.getResources().containsKey(key)) {
throw new NoSuchElementException(
"Resource " + key + " is not tracked in this resource set.");
}
}
}
private static <T extends Number> boolean isAvailable(T available, T used, T requested) {
// Resources are considered available if any one of the conditions below is true:
// 1) If resource is not requested at all, it is available.
// 2) If resource is not used at the moment, it is considered to be
// available regardless of how much is requested. This is necessary to
// ensure that at any given time, at least one thread is able to acquire
// resources even if it requests more than available.
// 3) If used resource amount is less than total available resource amount.
return requested.doubleValue() == 0
|| used.doubleValue() == 0
|| used.doubleValue() + requested.doubleValue() <= available.doubleValue();
}
// Method will return true if all requested resources are considered to be available.
@VisibleForTesting
boolean areResourcesAvailable(ResourceSet resources) throws NoSuchElementException {
Preconditions.checkNotNull(availableResources);
// Comparison below is robust, since any calculation errors will be fixed
// by the release() method.
WorkerKey workerKey = resources.getWorkerKey();
if (workerKey != null) {
int availableWorkers = this.workerPool.getMaxTotalPerKey(workerKey);
int activeWorkers = this.workerPool.getNumActive(workerKey);
if (activeWorkers >= availableWorkers) {
return false;
}
}
// We test for tracking of extra resources whenever acquired and throw an
// exception before acquiring any untracked resource.
assertResourcesTracked(resources);
if (usedResources.isEmpty() && usedLocalTestCount == 0) {
return true;
}
int availableLocalTestCount = availableResources.getLocalTestCount();
if (!isAvailable(availableLocalTestCount, usedLocalTestCount, resources.getLocalTestCount())) {
return false;
}
for (Map.Entry<String, Double> resource : resources.getResources().entrySet()) {
String key = resource.getKey();
// Use only MIN_NECESSARY_RATIO of the resource value to check for
// allocation. This is necessary to account for the fact that most of the
// requested resource sets use pessimistic estimations. Note that this
// ratio is used only during comparison - for tracking we will actually
// mark whole requested amount as used.
double requested =
resource.getValue() * MIN_NECESSARY_RATIO.getOrDefault(key, DEFAULT_MIN_NECESSARY_RATIO);
double used = usedResources.getOrDefault(key, 0.0);
double available = availableResources.get(key);
if (!isAvailable(available, used, requested)) {
return false;
}
}
return true;
}
@VisibleForTesting
synchronized int getWaitCount() {
return localRequests.size() + dynamicStandaloneRequests.size() + dynamicWorkerRequests.size();
}
private static class LatchWithWorker {
public final CountDownLatch latch;
public Worker worker;
public LatchWithWorker(CountDownLatch latch, Worker worker) {
this.latch = latch;
this.worker = worker;
}
}
}