blob: b301f6db93f86f178cb6c367ee772d37b6ae56dc [file] [log] [blame]
// Copyright 2022 The Bazel Authors. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package com.google.devtools.build.lib.worker;
import static com.google.common.collect.ImmutableSet.toImmutableSet;
import static java.nio.charset.StandardCharsets.UTF_8;
import com.google.auto.value.AutoValue;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Splitter;
import com.google.common.base.VerifyException;
import com.google.common.collect.HashMultimap;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import com.google.common.flogger.GoogleLogger;
import com.google.devtools.build.lib.clock.Clock;
import com.google.devtools.build.lib.util.OS;
import com.google.devtools.build.lib.worker.WorkerMetric.WorkerStat;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.time.Duration;
import java.time.Instant;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
/** Collects and populates system metrics about persistent workers. */
public class WorkerMetricsCollector {
private static final GoogleLogger logger = GoogleLogger.forEnclosingClass();
/** The metrics collector (a static singleton instance). Inactive by default. */
private static final WorkerMetricsCollector instance = new WorkerMetricsCollector();
private Clock clock;
/**
* Mapping of worker ids to their metrics. Contains worker ids, which memory usage could be
* measured.
*/
private final Map<Integer, WorkerMetric.WorkerProperties> workerIdToWorkerProperties =
new ConcurrentHashMap<>();
private final Map<Integer, Instant> workerLastCallTime = new ConcurrentHashMap<>();
private MetricsWithTime lastMetrics;
private WorkerMetricsCollector() {}
public static WorkerMetricsCollector instance() {
return instance;
}
public void setClock(Clock clock) {
this.clock = clock;
}
/**
* Collects memory usage of all ancestors of processes by pid. If a pid does not allow collecting
* memory usage, it is silently ignored.
*/
MemoryCollectionResult collectMemoryUsageByPid(OS os, ImmutableSet<Long> processIds) {
// TODO(b/181317827): Support Windows.
if (processIds.isEmpty() || (os != OS.LINUX && os != OS.DARWIN)) {
return new MemoryCollectionResult(
ImmutableMap.of(), Instant.ofEpochMilli(clock.currentTimeMillis()));
}
ImmutableMap<Long, PsInfo> psInfos;
try {
psInfos = collectDataFromPs();
} catch (RuntimeException e) {
throw new VerifyException(
String.format("Could not collect data for pids: %s", processIds), e);
}
ImmutableMap<Long, Integer> pidToMemoryInKb = summarizeDescendantsMemory(psInfos, processIds);
return new MemoryCollectionResult(
pidToMemoryInKb, Instant.ofEpochMilli(clock.currentTimeMillis()));
}
private ImmutableMap<Long, Integer> summarizeDescendantsMemory(
ImmutableMap<Long, PsInfo> pidToPsInfo, ImmutableSet<Long> processIds) {
HashMultimap<Long, PsInfo> parentPidToPsInfo = HashMultimap.create();
for (PsInfo psInfo : pidToPsInfo.values()) {
parentPidToPsInfo.put(psInfo.getParentPid(), psInfo);
}
ImmutableMap.Builder<Long, Integer> pidToTotalMemoryInKb = ImmutableMap.builder();
for (Long pid : processIds) {
if (!pidToPsInfo.containsKey(pid)) {
continue;
}
PsInfo psInfo = pidToPsInfo.get(pid);
pidToTotalMemoryInKb.put(pid, collectMemoryUsageOfDescendants(psInfo, parentPidToPsInfo));
}
return pidToTotalMemoryInKb.buildOrThrow();
}
/** Recurseviely collects total memory usage of all descendants of process. */
private int collectMemoryUsageOfDescendants(
PsInfo psInfo, HashMultimap<Long, PsInfo> parentPidToPsInfo) {
int currentMemoryInKb = psInfo.getMemoryInKb();
for (PsInfo childrenPsInfo : parentPidToPsInfo.get(psInfo.getPid())) {
currentMemoryInKb += collectMemoryUsageOfDescendants(childrenPsInfo, parentPidToPsInfo);
}
return currentMemoryInKb;
}
// Collects memory usage for every process
private ImmutableMap<Long, PsInfo> collectDataFromPs() {
BufferedReader psOutput;
try {
psOutput =
new BufferedReader(new InputStreamReader(buildPsProcess().getInputStream(), UTF_8));
} catch (IOException e) {
logger.atWarning().withCause(e).log("Error while executing command ps");
return ImmutableMap.of();
}
ImmutableMap.Builder<Long, PsInfo> psInfos = ImmutableMap.builder();
try {
// The output of the above ps command looks similar to this:
// PID PPID RSS
// 211706 1 222972
// 2612333 211706 6180
// We skip over the first line (the header) and then parse the PID and the resident memory
// size in kilobytes.
String output = null;
boolean isFirst = true;
while ((output = psOutput.readLine()) != null) {
if (isFirst) {
isFirst = false;
continue;
}
List<String> line = Splitter.on(" ").trimResults().omitEmptyStrings().splitToList(output);
if (line.size() != 3) {
logger.atWarning().log("Unexpected length of split line %s %d", output, line.size());
continue;
}
long pid = Long.parseLong(line.get(0));
long parentPid = Long.parseLong(line.get(1));
int memoryInKb = Integer.parseInt(line.get(2));
psInfos.put(pid, PsInfo.create(pid, parentPid, memoryInKb));
}
} catch (IllegalArgumentException | IOException e) {
logger.atWarning().withCause(e).log("Error while parsing psOutput: %s", psOutput);
}
return psInfos.buildOrThrow();
}
/** Parsed information about process collected after ps command call. */
@AutoValue
public abstract static class PsInfo {
public abstract long getPid();
public abstract long getParentPid();
public abstract int getMemoryInKb();
public static PsInfo create(long pid, long parentPid, int memoryinKb) {
return new AutoValue_WorkerMetricsCollector_PsInfo(pid, parentPid, memoryinKb);
}
}
@VisibleForTesting
public Process buildPsProcess() throws IOException {
return new ProcessBuilder("ps", "-e", "-o", "pid,ppid,rss").start();
}
/**
* Collect worker metrics. If last collected metrics weren't more than interval time ago, then
* returns previously collected metrics;
*/
public ImmutableList<WorkerMetric> collectMetrics(Duration interval) {
Instant now = Instant.ofEpochMilli(clock.currentTimeMillis());
if (lastMetrics != null && Duration.between(lastMetrics.time, now).compareTo(interval) < 0) {
return lastMetrics.metrics;
}
return collectMetrics();
}
// TODO(wilwell): add exception if we couldn't collect the metrics.
public ImmutableList<WorkerMetric> collectMetrics() {
MemoryCollectionResult memoryCollectionResult =
collectMemoryUsageByPid(
OS.getCurrent(),
workerIdToWorkerProperties.values().stream()
.map(WorkerMetric.WorkerProperties::getProcessId)
.collect(toImmutableSet()));
ImmutableMap<Long, Integer> pidToMemoryInKb = memoryCollectionResult.pidToMemoryInKb;
Instant collectionTime = memoryCollectionResult.collectionTime;
ImmutableList.Builder<WorkerMetric> workerMetrics = new ImmutableList.Builder<>();
List<Integer> nonMeasurableWorkerIds = new ArrayList<>();
for (WorkerMetric.WorkerProperties workerProperties : workerIdToWorkerProperties.values()) {
Long pid = workerProperties.getProcessId();
Integer workerId = workerProperties.getWorkerId();
WorkerStat workerStats =
WorkerStat.create(
pidToMemoryInKb.getOrDefault(pid, 0),
workerLastCallTime.get(workerId),
collectionTime);
workerMetrics.add(
WorkerMetric.create(
workerProperties, workerStats, /* isMeasurable= */ pidToMemoryInKb.containsKey(pid)));
if (!pidToMemoryInKb.containsKey(pid)) {
nonMeasurableWorkerIds.add(workerId);
}
}
workerIdToWorkerProperties.keySet().removeAll(nonMeasurableWorkerIds);
return updateLastCollectMetrics(workerMetrics.build(), collectionTime).metrics;
}
public void clear() {
workerIdToWorkerProperties.clear();
workerLastCallTime.clear();
lastMetrics = null;
}
@VisibleForTesting
public Map<Integer, WorkerMetric.WorkerProperties> getWorkerIdToWorkerProperties() {
return workerIdToWorkerProperties;
}
@VisibleForTesting
public Map<Integer, Instant> getWorkerLastCallTime() {
return workerLastCallTime;
}
/**
* Initializes workerIdToWorkerProperties for workers. If worker metrics already exists for this
* worker, only updates workerLastCallTime.
*/
public void registerWorker(WorkerMetric.WorkerProperties properties) {
int workerId = properties.getWorkerId();
workerIdToWorkerProperties.putIfAbsent(workerId, properties);
workerLastCallTime.put(workerId, Instant.ofEpochMilli(clock.currentTimeMillis()));
}
private synchronized MetricsWithTime updateLastCollectMetrics(
ImmutableList<WorkerMetric> metrics, Instant time) {
lastMetrics = new MetricsWithTime(metrics, time);
return lastMetrics;
}
private static class MetricsWithTime {
public final ImmutableList<WorkerMetric> metrics;
public final Instant time;
public MetricsWithTime(ImmutableList<WorkerMetric> metrics, Instant time) {
this.metrics = metrics;
this.time = time;
}
}
static class MemoryCollectionResult {
public final ImmutableMap<Long, Integer> pidToMemoryInKb;
public final Instant collectionTime;
public MemoryCollectionResult(
ImmutableMap<Long, Integer> pidToMemoryInKb, Instant collectionTime) {
this.pidToMemoryInKb = pidToMemoryInKb;
this.collectionTime = collectionTime;
}
}
// TODO(b/238416583) Add deregister function
}