blob: 5131b3c71b20314a2a3e1fa8c52222be3cba867b [file] [log] [blame]
// Copyright 2022 The Bazel Authors. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package com.google.devtools.build.lib.worker;
import static com.google.common.collect.ImmutableList.toImmutableList;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
import com.google.common.eventbus.EventBus;
import com.google.common.flogger.GoogleLogger;
import com.google.devtools.build.lib.events.Event;
import com.google.devtools.build.lib.events.Reporter;
import java.util.Comparator;
import java.util.HashSet;
import java.util.List;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import org.apache.commons.pool2.PooledObject;
import org.apache.commons.pool2.impl.EvictionConfig;
import org.apache.commons.pool2.impl.EvictionPolicy;
/**
* This class kills idle persistent workers at intervals, if the total worker resource usage is
* above a specified limit. Must be used as singleton.
*/
final class WorkerLifecycleManager extends Thread {
private static final GoogleLogger logger = GoogleLogger.forEnclosingClass();
private boolean isWorking = false;
private boolean emptyEvictionWasLogged = false;
private final WorkerPool workerPool;
private final WorkerOptions options;
private Reporter reporter;
private EventBus eventBus;
public WorkerLifecycleManager(WorkerPool workerPool, WorkerOptions options) {
this.workerPool = workerPool;
this.options = options;
}
public void setReporter(Reporter reporter) {
this.reporter = reporter;
}
public void setEventBus(EventBus eventBus) {
this.eventBus = eventBus;
}
@Override
public void run() {
if (options.totalWorkerMemoryLimitMb == 0 && options.workerMemoryLimitMb == 0) {
return;
}
String msg =
String.format(
"Worker Lifecycle Manager starts work with (total limit: %d MB, limit: %d MB,"
+ " shrinking: %s)",
options.totalWorkerMemoryLimitMb,
options.workerMemoryLimitMb,
options.shrinkWorkerPool ? "enabled" : "disabled");
logger.atInfo().log("%s", msg);
if (reporter != null) {
reporter.handle(Event.info(msg));
}
isWorking = true;
// This loop works until method stopProcessing() called by WorkerModule.
while (isWorking) {
try {
Thread.sleep(options.workerMetricsPollInterval.toMillis());
} catch (InterruptedException e) {
logger.atInfo().withCause(e).log("received interrupt in worker life cycle manager");
break;
}
ImmutableList<WorkerMetric> workerMetrics =
WorkerMetricsCollector.instance().collectMetrics();
if (options.totalWorkerMemoryLimitMb > 0) {
try {
evictWorkers(workerMetrics);
} catch (InterruptedException e) {
logger.atInfo().withCause(e).log("received interrupt in worker life cycle manager");
break;
}
}
if (options.workerMemoryLimitMb > 0) {
killLargeWorkers(workerMetrics, options.workerMemoryLimitMb);
}
}
isWorking = false;
}
void stopProcessing() {
isWorking = false;
}
/** Kills any worker that uses more than {@code limitMb} MB of memory. */
void killLargeWorkers(ImmutableList<WorkerMetric> workerMetrics, int limitMb) {
ImmutableList<WorkerMetric> large =
workerMetrics.stream()
.filter(m -> m.getWorkerStat().getUsedMemoryInKB() / 1000 > limitMb)
.collect(toImmutableList());
for (WorkerMetric l : large) {
String msg;
ImmutableList<Integer> workerIds = l.getWorkerProperties().getWorkerIds();
Optional<ProcessHandle> ph = ProcessHandle.of(l.getWorkerProperties().getProcessId());
if (ph.isPresent()) {
msg =
String.format(
"Killing %s worker %s (pid %d) taking %dMB",
l.getWorkerProperties().getMnemonic(),
workerIds.size() == 1 ? workerIds.get(0) : workerIds,
l.getWorkerProperties().getProcessId(),
l.getWorkerStat().getUsedMemoryInKB() / 1000);
ph.get().destroyForcibly();
logger.atInfo().log("%s", msg);
if (reporter != null) {
reporter.handle(Event.info(msg));
}
if (eventBus != null) {
eventBus.post(
new WorkerEvictedEvent(
l.getWorkerProperties().getWorkerKeyHash(),
l.getWorkerProperties().getMnemonic()));
}
}
}
}
@VisibleForTesting // productionVisibility = Visibility.PRIVATE
void evictWorkers(ImmutableList<WorkerMetric> workerMetrics) throws InterruptedException {
if (options.totalWorkerMemoryLimitMb == 0) {
return;
}
int workerMemoryUsage =
workerMetrics.stream()
.mapToInt(metric -> metric.getWorkerStat().getUsedMemoryInKB() / 1000)
.sum();
// TODO: Remove after b/274608075 is fixed.
if (!workerMetrics.isEmpty()) {
logger.atInfo().atMostEvery(1, TimeUnit.MINUTES).log(
"total worker memory %dMB while limit is %dMB - details: %s",
workerMemoryUsage,
options.totalWorkerMemoryLimitMb,
workerMetrics.stream()
.map(
metric ->
metric.getWorkerProperties().getWorkerIds()
+ " "
+ metric.getWorkerProperties().getMnemonic()
+ " "
+ metric.getWorkerStat().getUsedMemoryInKB()
+ "kB")
.collect(Collectors.joining(", ")));
}
if (workerMemoryUsage <= options.totalWorkerMemoryLimitMb) {
return;
}
ImmutableSet<Integer> candidates =
collectEvictionCandidates(
workerMetrics, options.totalWorkerMemoryLimitMb, workerMemoryUsage);
if (!candidates.isEmpty() || !emptyEvictionWasLogged) {
String msg;
if (candidates.isEmpty()) {
msg =
String.format(
"Could not find any worker eviction candidates. Worker memory usage: %d MB, Memory"
+ " limit: %d MB",
workerMemoryUsage, options.totalWorkerMemoryLimitMb);
} else {
msg =
String.format("Going to evict %d workers with ids: %s", candidates.size(), candidates);
}
logger.atInfo().log("%s", msg);
if (reporter != null) {
reporter.handle(Event.info(msg));
}
}
ImmutableSet<Integer> evictedWorkers = evictCandidates(workerPool, candidates);
if (!evictedWorkers.isEmpty() || !emptyEvictionWasLogged) {
String msg =
String.format(
"Total evicted idle workers %d. With ids: %s", evictedWorkers.size(), evictedWorkers);
logger.atInfo().log("%s", msg);
if (reporter != null) {
reporter.handle(Event.info(msg));
}
emptyEvictionWasLogged = candidates.isEmpty();
}
if (eventBus != null) {
for (WorkerMetric metric : workerMetrics) {
WorkerMetric.WorkerProperties properties = metric.getWorkerProperties();
for (Integer workerId : properties.getWorkerIds()) {
if (evictedWorkers.contains(workerId)) {
eventBus.post(
new WorkerEvictedEvent(properties.getWorkerKeyHash(), properties.getMnemonic()));
}
}
}
}
if (options.shrinkWorkerPool) {
List<WorkerMetric> notEvictedWorkerMetrics =
workerMetrics.stream()
.filter(
metric ->
!evictedWorkers.containsAll(metric.getWorkerProperties().getWorkerIds()))
.collect(Collectors.toList());
int notEvictedWorkerMemoryUsage =
notEvictedWorkerMetrics.stream()
.mapToInt(metric -> metric.getWorkerStat().getUsedMemoryInKB() / 1000)
.sum();
if (notEvictedWorkerMemoryUsage <= options.totalWorkerMemoryLimitMb) {
return;
}
postponeInvalidation(notEvictedWorkerMetrics, notEvictedWorkerMemoryUsage);
}
}
private void postponeInvalidation(
List<WorkerMetric> workerMetrics, int notEvictedWorkerMemoryUsage) {
ImmutableSet<Integer> potentialCandidates =
getCandidates(workerMetrics, options.totalWorkerMemoryLimitMb, notEvictedWorkerMemoryUsage);
if (!potentialCandidates.isEmpty()) {
String msg = String.format("New doomed workers candidates %s", potentialCandidates);
logger.atInfo().log("%s", msg);
if (reporter != null) {
reporter.handle(Event.info(msg));
}
workerPool.setDoomedWorkers(potentialCandidates);
}
}
/**
* Applies eviction police for candidates. Returns the worker ids of evicted workers. We don't
* guarantee that every candidate is going to be evicted. Returns worker ids of evicted workers.
*/
private static ImmutableSet<Integer> evictCandidates(
WorkerPool pool, ImmutableSet<Integer> candidates) throws InterruptedException {
CandidateEvictionPolicy policy = new CandidateEvictionPolicy(candidates);
pool.evictWithPolicy(policy);
return policy.getEvictedWorkers();
}
/** Collects worker candidates to evict. Chooses workers with the largest memory consumption. */
@SuppressWarnings("JdkCollectors")
ImmutableSet<Integer> collectEvictionCandidates(
ImmutableList<WorkerMetric> workerMetrics, int memoryLimitMb, int workerMemoryUsageMb)
throws InterruptedException {
Set<Integer> idleWorkers = getIdleWorkers();
List<WorkerMetric> idleWorkerMetrics =
workerMetrics.stream()
.filter(metric -> idleWorkers.containsAll(metric.getWorkerProperties().getWorkerIds()))
.collect(Collectors.toList());
return getCandidates(idleWorkerMetrics, memoryLimitMb, workerMemoryUsageMb);
}
/**
* Chooses the worker ids of workers with the most usage of memory. Selects workers until total
* memory usage is less than memoryLimitMb.
*/
private static ImmutableSet<Integer> getCandidates(
List<WorkerMetric> workerMetrics, int memoryLimitMb, int usedMemoryMb) {
workerMetrics.sort(new MemoryComparator());
ImmutableSet.Builder<Integer> candidates = ImmutableSet.builder();
int freeMemoryMb = 0;
for (WorkerMetric metric : workerMetrics) {
candidates.addAll(metric.getWorkerProperties().getWorkerIds());
freeMemoryMb += metric.getWorkerStat().getUsedMemoryInKB() / 1000;
if (usedMemoryMb - freeMemoryMb <= memoryLimitMb) {
break;
}
}
return candidates.build();
}
/**
* Calls workerPool.evict() to collect information, but doesn't kill any workers during this
* process.
*/
private Set<Integer> getIdleWorkers() throws InterruptedException {
InfoEvictionPolicy infoEvictionPolicy = new InfoEvictionPolicy();
workerPool.evictWithPolicy(infoEvictionPolicy);
return infoEvictionPolicy.getWorkerIds();
}
/**
* Eviction policy for WorkerPool. Only collects ids of idle workers, doesn't evict any of them.
*/
private static class InfoEvictionPolicy implements EvictionPolicy<Worker> {
private final Set<Integer> workerIds = new HashSet<>();
public InfoEvictionPolicy() {}
@Override
public boolean evict(EvictionConfig config, PooledObject<Worker> underTest, int idleCount) {
workerIds.add(underTest.getObject().getWorkerId());
return false;
}
public Set<Integer> getWorkerIds() {
return workerIds;
}
}
/** Eviction policy for WorkerPool. Evict all idle workers, which were passed in constructor. */
private static class CandidateEvictionPolicy implements EvictionPolicy<Worker> {
private final ImmutableSet<Integer> workerCandidates;
private final Set<Integer> evictedWorkers;
public CandidateEvictionPolicy(ImmutableSet<Integer> workerCandidates) {
this.workerCandidates = workerCandidates;
this.evictedWorkers = new HashSet<>();
}
@Override
public boolean evict(EvictionConfig config, PooledObject<Worker> underTest, int idleCount) {
int workerId = underTest.getObject().getWorkerId();
if (workerCandidates.contains(workerId)) {
evictedWorkers.add(workerId);
logger.atInfo().log(
"Evicting worker %d with mnemonic %s",
workerId, underTest.getObject().getWorkerKey().getMnemonic());
return true;
}
return false;
}
public ImmutableSet<Integer> getEvictedWorkers() {
return ImmutableSet.copyOf(evictedWorkers);
}
}
/** Compare worker metrics by memory consumption in descending order. */
private static class MemoryComparator implements Comparator<WorkerMetric> {
@Override
public int compare(WorkerMetric m1, WorkerMetric m2) {
return m2.getWorkerStat().getUsedMemoryInKB() - m1.getWorkerStat().getUsedMemoryInKB();
}
}
}