blob: e1d64049b185c4e66da806bbe4947795b6bb4319 [file] [log] [blame]
// Copyright 2020 The Bazel Authors. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package com.google.devtools.build.lib.worker;
import static com.google.common.collect.ImmutableSet.toImmutableSet;
import static com.google.common.truth.Truth.assertThat;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.when;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import com.google.devtools.build.lib.buildeventstream.BuildEventStreamProtos.BuildMetrics.WorkerMetrics;
import com.google.devtools.build.lib.buildeventstream.BuildEventStreamProtos.BuildMetrics.WorkerMetrics.WorkerStatus;
import com.google.devtools.build.lib.clock.Clock;
import com.google.devtools.build.lib.metrics.CgroupsInfoCollector;
import com.google.devtools.build.lib.metrics.PsInfoCollector;
import com.google.devtools.build.lib.metrics.ResourceSnapshot;
import com.google.devtools.build.lib.util.OS;
import com.google.devtools.build.lib.worker.WorkerProcessMetricsCollector.WorkerMetricsPublishComparator;
import com.google.devtools.build.lib.worker.WorkerProcessStatus.Status;
import java.time.Instant;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
/** Unit tests for the WorkerSpawnRunner. */
@RunWith(JUnit4.class)
public class WorkerProcessMetricsCollectorTest {
private PsInfoCollector psInfoCollector;
private CgroupsInfoCollector cgroupsInfoCollector;
private WorkerProcessMetricsCollector spyCollector;
ManualClock clock = new ManualClock();
@Before
public void setUp() {
psInfoCollector = mock(PsInfoCollector.class);
cgroupsInfoCollector = mock(CgroupsInfoCollector.class);
spyCollector = spy(new WorkerProcessMetricsCollector(psInfoCollector, cgroupsInfoCollector));
spyCollector.clear();
spyCollector.setClock(clock);
}
private static final int WORKER_ID_1 = 1;
private static final int WORKER_ID_2 = 2;
private static final int WORKER_ID_3 = 3;
private static final int WORKER_ID_4 = 4;
private static final int WORKER_ID_5 = 5;
private static final long PROCESS_ID_1 = 100L;
private static final long PROCESS_ID_2 = 200L;
private static final long PROCESS_ID_3 = 300L;
private static final long PROCESS_ID_4 = 400L;
private static final long PROCESS_ID_5 = 500L;
private static final int WORKER_KEY_HASH_1 = 1;
private static final int WORKER_KEY_HASH_2 = 2;
private static final int WORKER_KEY_HASH_3 = 3;
private static final int WORKER_KEY_HASH_4 = 4;
private static final int WORKER_KEY_HASH_5 = 5;
private static final String JAVAC_MNEMONIC = "Javac";
private static final String CPP_COMPILE_MNEMONIC = "CppCompile";
private static final String PROTO_MNEMONIC = "Proto";
private void assertWorkerMetricContains(
WorkerProcessMetrics workerMetric,
ImmutableList<Integer> expectedWorkerIds,
Long expectedProcessId,
String expectedMnemonic,
boolean expectedIsMultiplex,
boolean expectedIsSandboxed,
int expectedWorkerKeyHash,
int expectedActionsExecuted,
boolean expectedIsMeasurable,
Instant expectedLastCallTime,
Instant expectedCollectedTime) {
assertThat(workerMetric).isNotNull();
assertThat(workerMetric.getWorkerIds()).containsExactlyElementsIn(expectedWorkerIds);
assertThat(workerMetric.getProcessId()).isEqualTo(expectedProcessId);
assertThat(workerMetric.getMnemonic()).isEqualTo(expectedMnemonic);
assertThat(workerMetric.isMultiplex()).isEqualTo(expectedIsMultiplex);
assertThat(workerMetric.isSandboxed()).isEqualTo(expectedIsSandboxed);
assertThat(workerMetric.getWorkerKeyHash()).isEqualTo(expectedWorkerKeyHash);
assertThat(workerMetric.getActionsExecuted()).isEqualTo(expectedActionsExecuted);
assertThat(workerMetric.isMeasurable()).isEqualTo(expectedIsMeasurable);
assertThat(workerMetric.getLastCallTime().get()).isEqualTo(expectedLastCallTime);
if (expectedCollectedTime == null) {
assertThat(workerMetric.getLastCollectedTime().isEmpty()).isTrue();
} else {
assertThat(workerMetric.getLastCollectedTime().isPresent()).isTrue();
assertThat(workerMetric.getLastCollectedTime().get()).isEqualTo(expectedCollectedTime);
}
}
@Test
public void testRegisterWorker_insertDifferent() throws Exception {
spyCollector.registerWorker(
WORKER_ID_1,
PROCESS_ID_1,
new WorkerProcessStatus(),
JAVAC_MNEMONIC,
/* isMultiplex= */ true,
/* isSandboxed= */ false,
WORKER_KEY_HASH_1,
/* cgroup= */ null);
assertThat(spyCollector.getPidToWorkerProcessMetrics().keySet()).containsExactly(PROCESS_ID_1);
spyCollector.registerWorker(
WORKER_ID_2,
PROCESS_ID_2,
new WorkerProcessStatus(),
CPP_COMPILE_MNEMONIC,
/* isMultiplex= */ false,
/* isSandboxed= */ true,
WORKER_KEY_HASH_2,
/* cgroup= */ null);
assertThat(spyCollector.getPidToWorkerProcessMetrics().keySet())
.containsExactly(PROCESS_ID_1, PROCESS_ID_2);
assertWorkerMetricContains(
spyCollector.getPidToWorkerProcessMetrics().get(PROCESS_ID_1),
ImmutableList.of(WORKER_ID_1),
PROCESS_ID_1,
JAVAC_MNEMONIC,
/* expectedIsMultiplex= */ true,
/* expectedIsSandboxed= */ false,
WORKER_KEY_HASH_1,
/* expectedActionsExecuted= */ 0,
/* expectedIsMeasurable= */ false,
/* expectedLastCallTime= */ DEFAULT_CLOCK_START_INSTANT,
/* expectedCollectedTime= */ null);
assertWorkerMetricContains(
spyCollector.getPidToWorkerProcessMetrics().get(PROCESS_ID_2),
ImmutableList.of(WORKER_ID_2),
PROCESS_ID_2,
CPP_COMPILE_MNEMONIC,
/* expectedIsMultiplex= */ false,
/* expectedIsSandboxed= */ true,
WORKER_KEY_HASH_2,
/* expectedActionsExecuted= */ 0,
/* expectedIsMeasurable= */ false,
/* expectedLastCallTime= */ DEFAULT_CLOCK_START_INSTANT,
/* expectedCollectedTime= */ null);
}
@Test
public void testRegisterWorker_insertMultiplex() throws Exception {
spyCollector.registerWorker(
WORKER_ID_1,
PROCESS_ID_1,
new WorkerProcessStatus(),
JAVAC_MNEMONIC,
/* isMultiplex= */ true,
/* isSandboxed= */ true,
WORKER_KEY_HASH_1,
/* cgroup= */ null);
assertThat(spyCollector.getPidToWorkerProcessMetrics().keySet()).containsExactly(PROCESS_ID_1);
assertWorkerMetricContains(
spyCollector.getPidToWorkerProcessMetrics().get(PROCESS_ID_1),
ImmutableList.of(WORKER_ID_1),
PROCESS_ID_1,
JAVAC_MNEMONIC,
/* expectedIsMultiplex= */ true,
/* expectedIsSandboxed= */ true,
WORKER_KEY_HASH_1,
/* expectedActionsExecuted= */ 0,
/* expectedIsMeasurable= */ false,
/* expectedLastCallTime= */ DEFAULT_CLOCK_START_INSTANT,
/* expectedCollectedTime= */ null);
Instant secondTime = DEFAULT_CLOCK_START_INSTANT.plusSeconds(10);
clock.setTime(secondTime.toEpochMilli());
spyCollector.registerWorker(
WORKER_ID_2,
PROCESS_ID_1,
new WorkerProcessStatus(),
JAVAC_MNEMONIC,
/* isMultiplex= */ true,
/* isSandboxed= */ true,
WORKER_KEY_HASH_1,
/* cgroup= */ null);
assertThat(spyCollector.getPidToWorkerProcessMetrics().keySet()).containsExactly(PROCESS_ID_1);
assertWorkerMetricContains(
spyCollector.getPidToWorkerProcessMetrics().get(PROCESS_ID_1),
ImmutableList.of(WORKER_ID_1, WORKER_ID_2),
PROCESS_ID_1,
JAVAC_MNEMONIC,
/* expectedIsMultiplex= */ true,
/* expectedIsSandboxed= */ true,
WORKER_KEY_HASH_1,
/* expectedActionsExecuted= */ 0,
/* expectedIsMeasurable= */ false,
/* expectedLastCallTime= */ secondTime,
/* expectedCollectedTime= */ null);
}
@Test
public void testRegisterWorker_insertSame() throws Exception {
spyCollector.registerWorker(
WORKER_ID_1,
PROCESS_ID_1,
new WorkerProcessStatus(),
JAVAC_MNEMONIC,
/* isMultiplex= */ true,
/* isSandboxed= */ true,
WORKER_KEY_HASH_1,
/* cgroup= */ null);
assertThat(spyCollector.getPidToWorkerProcessMetrics().keySet()).containsExactly(PROCESS_ID_1);
assertWorkerMetricContains(
spyCollector.getPidToWorkerProcessMetrics().get(PROCESS_ID_1),
ImmutableList.of(WORKER_ID_1),
PROCESS_ID_1,
JAVAC_MNEMONIC,
/* expectedIsMultiplex= */ true,
/* expectedIsSandboxed= */ true,
WORKER_KEY_HASH_1,
/* expectedActionsExecuted= */ 0,
/* expectedIsMeasurable= */ false,
/* expectedLastCallTime= */ DEFAULT_CLOCK_START_INSTANT,
/* expectedCollectedTime= */ null);
Instant secondTime = DEFAULT_CLOCK_START_INSTANT.plusSeconds(10);
clock.setTime(secondTime.toEpochMilli());
// When it is the same worker, it should only update the last call time.
spyCollector.registerWorker(
WORKER_ID_1,
PROCESS_ID_1,
new WorkerProcessStatus(),
JAVAC_MNEMONIC,
/* isMultiplex= */ true,
/* isSandboxed= */ true,
WORKER_KEY_HASH_1,
/* cgroup= */ null);
assertThat(spyCollector.getPidToWorkerProcessMetrics().keySet()).containsExactly(PROCESS_ID_1);
assertWorkerMetricContains(
spyCollector.getPidToWorkerProcessMetrics().get(PROCESS_ID_1),
ImmutableList.of(WORKER_ID_1),
PROCESS_ID_1,
JAVAC_MNEMONIC,
/* expectedIsMultiplex= */ true,
/* expectedIsSandboxed= */ true,
WORKER_KEY_HASH_1,
/* expectedActionsExecuted= */ 0,
/* expectedIsMeasurable= */ false,
/* expectedLastCallTime= */ secondTime,
/* expectedCollectedTime= */ null);
}
@Test
public void testCollectMetrics() throws Exception {
// Worker 1 simulates a measurable worker processes has executed some actions.
spyCollector.registerWorker(
WORKER_ID_1,
PROCESS_ID_1,
new WorkerProcessStatus(),
JAVAC_MNEMONIC,
/* isMultiplex= */ true,
/* isSandboxed= */ false,
WORKER_KEY_HASH_1,
/* cgroup= */ null);
spyCollector.onWorkerFinishExecution(PROCESS_ID_1);
// Worker 2 simulates a measurable worker process that has not yet completed execution of any
// actions.
spyCollector.registerWorker(
WORKER_ID_2,
PROCESS_ID_2,
new WorkerProcessStatus(),
CPP_COMPILE_MNEMONIC,
/* isMultiplex= */ false,
/* isSandboxed= */ true,
WORKER_KEY_HASH_2,
/* cgroup= */ null);
// Worker 3 simulates a non-measurable worker that has not executed any actions.
spyCollector.registerWorker(
WORKER_ID_3,
PROCESS_ID_3,
new WorkerProcessStatus(),
PROTO_MNEMONIC,
/* isMultiplex= */ true,
/* isSandboxed= */ true,
WORKER_KEY_HASH_3,
/* cgroup= */ null);
// Worker 4 simulates a non-measurable worker that has executed an action and was killed.
WorkerProcessStatus s4 = new WorkerProcessStatus();
spyCollector.registerWorker(
WORKER_ID_4,
PROCESS_ID_4,
/* status= */ s4,
PROTO_MNEMONIC,
/* isMultiplex= */ true,
/* isSandboxed= */ true,
WORKER_KEY_HASH_4,
/* cgroup= */ null);
spyCollector.onWorkerFinishExecution(PROCESS_ID_4);
s4.maybeUpdateStatus(Status.KILLED_DUE_TO_MEMORY_PRESSURE);
// Worker 5 simulates a non-measurable worker that has executed an action, but was not killed.
WorkerProcessStatus s5 = new WorkerProcessStatus();
spyCollector.registerWorker(
WORKER_ID_5,
PROCESS_ID_5,
/* status= */ s5,
PROTO_MNEMONIC,
/* isMultiplex= */ true,
/* isSandboxed= */ true,
WORKER_KEY_HASH_5,
/* cgroup= */ null);
spyCollector.onWorkerFinishExecution(PROCESS_ID_5);
ImmutableMap<Long, Integer> memoryUsageMap =
ImmutableMap.of(
PROCESS_ID_1, 1234,
PROCESS_ID_2, 2345);
Instant collectionTime = DEFAULT_CLOCK_START_INSTANT.plusSeconds(10);
ResourceSnapshot resourceSnapshot = ResourceSnapshot.create(memoryUsageMap, collectionTime);
doReturn(resourceSnapshot).when(spyCollector).collectResourceUsage();
clock.setTime(collectionTime.toEpochMilli());
ImmutableList<WorkerProcessMetrics> metrics = spyCollector.collectMetrics();
// All workers measurable or non-measurable should be reported.
assertThat(metrics.stream().flatMap(m -> m.getWorkerIds().stream()).collect(toImmutableSet()))
.containsExactly(WORKER_ID_1, WORKER_ID_2, WORKER_ID_3, WORKER_ID_4, WORKER_ID_5);
assertWorkerMetricContains(
getWorkerProcessMetricsFromList(WORKER_ID_1, metrics),
ImmutableList.of(WORKER_ID_1),
PROCESS_ID_1,
JAVAC_MNEMONIC,
/* expectedIsMultiplex= */ true,
/* expectedIsSandboxed= */ false,
WORKER_KEY_HASH_1,
/* expectedActionsExecuted= */ 1,
/* expectedIsMeasurable= */ true,
/* expectedLastCallTime= */ DEFAULT_CLOCK_START_INSTANT,
/* expectedCollectedTime= */ collectionTime);
assertWorkerMetricContains(
getWorkerProcessMetricsFromList(WORKER_ID_2, metrics),
ImmutableList.of(WORKER_ID_2),
PROCESS_ID_2,
CPP_COMPILE_MNEMONIC,
/* expectedIsMultiplex= */ false,
/* expectedIsSandboxed= */ true,
WORKER_KEY_HASH_2,
/* expectedActionsExecuted= */ 0,
/* expectedIsMeasurable= */ true,
/* expectedLastCallTime= */ DEFAULT_CLOCK_START_INSTANT,
/* expectedCollectedTime= */ collectionTime);
// Worker 3's metrics should not be included since it is both non-measurable and did not execute
// any actions. Its status shouldn't be unknown because it is possible that
assertWorkerMetricContains(
getWorkerProcessMetricsFromList(WORKER_ID_3, metrics),
ImmutableList.of(WORKER_ID_3),
PROCESS_ID_3,
PROTO_MNEMONIC,
/* expectedIsMultiplex= */ true,
/* expectedIsSandboxed= */ true,
WORKER_KEY_HASH_3,
/* expectedActionsExecuted= */ 0,
/* expectedIsMeasurable= */ false,
/* expectedLastCallTime= */ DEFAULT_CLOCK_START_INSTANT,
/* expectedCollectedTime= */ null);
assertWorkerMetricContains(
getWorkerProcessMetricsFromList(WORKER_ID_4, metrics),
ImmutableList.of(WORKER_ID_4),
PROCESS_ID_4,
PROTO_MNEMONIC,
/* expectedIsMultiplex= */ true,
/* expectedIsSandboxed= */ true,
WORKER_KEY_HASH_4,
/* expectedActionsExecuted= */ 1,
/* expectedIsMeasurable= */ false,
/* expectedLastCallTime= */ DEFAULT_CLOCK_START_INSTANT,
/* expectedCollectedTime= */ null);
assertWorkerMetricContains(
getWorkerProcessMetricsFromList(WORKER_ID_5, metrics),
ImmutableList.of(WORKER_ID_5),
PROCESS_ID_5,
PROTO_MNEMONIC,
/* expectedIsMultiplex= */ true,
/* expectedIsSandboxed= */ true,
WORKER_KEY_HASH_5,
/* expectedActionsExecuted= */ 1,
/* expectedIsMeasurable= */ false,
/* expectedLastCallTime= */ DEFAULT_CLOCK_START_INSTANT,
/* expectedCollectedTime= */ null);
// Worker 5's status should have been updated to killed_unknown, because it had executed actions
// but is now non-measurable.
assertThat(s5.get()).isEqualTo(Status.KILLED_UNKNOWN);
}
@Test
public void testCollectResourceUsage_windows() {
Instant collectionTime = DEFAULT_CLOCK_START_INSTANT.plusSeconds(10);
clock.setTime(collectionTime.toEpochMilli());
when(psInfoCollector.collectResourceUsage(any(), any()))
.thenReturn(ResourceSnapshot.create(ImmutableMap.of(PROCESS_ID_1, 1000), collectionTime));
when(cgroupsInfoCollector.collectResourceUsage(any(), any()))
.thenReturn(ResourceSnapshot.create(ImmutableMap.of(PROCESS_ID_1, 2000), collectionTime));
ResourceSnapshot snapshot =
spyCollector.collectResourceUsage(OS.WINDOWS, ImmutableSet.of(PROCESS_ID_1));
// On non-linux and non-darwin, it should always return an empty snapshot.
assertThat(snapshot).isEqualTo(ResourceSnapshot.create(ImmutableMap.of(), collectionTime));
}
@Test
public void testCollectResourceUsage_darwin_usingPs() {
Instant collectionTime = DEFAULT_CLOCK_START_INSTANT.plusSeconds(10);
clock.setTime(collectionTime.toEpochMilli());
when(psInfoCollector.collectResourceUsage(any(), any()))
.thenReturn(ResourceSnapshot.create(ImmutableMap.of(PROCESS_ID_1, 1000), collectionTime));
when(cgroupsInfoCollector.collectResourceUsage(any(), any()))
.thenReturn(ResourceSnapshot.create(ImmutableMap.of(PROCESS_ID_1, 2000), collectionTime));
ResourceSnapshot snapshot =
spyCollector.collectResourceUsage(OS.DARWIN, ImmutableSet.of(PROCESS_ID_1));
// Should return the cgroup information rather than the ps information.
assertThat(snapshot)
.isEqualTo(ResourceSnapshot.create(ImmutableMap.of(PROCESS_ID_1, 1000), collectionTime));
}
@Test
public void testCollectResourceUsage_linux_usingPs() {
spyCollector.setUseCgroupsOnLinux(/* useCgroupsOnLinux= */ false);
Instant collectionTime = DEFAULT_CLOCK_START_INSTANT.plusSeconds(10);
clock.setTime(collectionTime.toEpochMilli());
when(psInfoCollector.collectResourceUsage(any(), any()))
.thenReturn(ResourceSnapshot.create(ImmutableMap.of(PROCESS_ID_1, 1000), collectionTime));
when(cgroupsInfoCollector.collectResourceUsage(any(), any()))
.thenReturn(ResourceSnapshot.create(ImmutableMap.of(PROCESS_ID_1, 2000), collectionTime));
ResourceSnapshot snapshot =
spyCollector.collectResourceUsage(OS.LINUX, ImmutableSet.of(PROCESS_ID_1));
// Should return the ps information rather than the cgroup information.
assertThat(snapshot)
.isEqualTo(ResourceSnapshot.create(ImmutableMap.of(PROCESS_ID_1, 1000), collectionTime));
}
@Test
public void testCollectResourceUsage_linux_usingCgroups() {
spyCollector.setUseCgroupsOnLinux(/* useCgroupsOnLinux= */ true);
Instant collectionTime = DEFAULT_CLOCK_START_INSTANT.plusSeconds(10);
clock.setTime(collectionTime.toEpochMilli());
when(psInfoCollector.collectResourceUsage(any(), any()))
.thenReturn(ResourceSnapshot.create(ImmutableMap.of(PROCESS_ID_1, 1000), collectionTime));
when(cgroupsInfoCollector.collectResourceUsage(any(), any()))
.thenReturn(ResourceSnapshot.create(ImmutableMap.of(PROCESS_ID_1, 2000), collectionTime));
ResourceSnapshot snapshot =
spyCollector.collectResourceUsage(OS.LINUX, ImmutableSet.of(PROCESS_ID_1));
// Should return the cgroup information rather than the ps information.
assertThat(snapshot)
.isEqualTo(ResourceSnapshot.create(ImmutableMap.of(PROCESS_ID_1, 2000), collectionTime));
}
@Test
public void testWorkerMetricsPublishComparator_compare() {
WorkerMetrics alive1 = newWorkerMetrics(1, WorkerStatus.ALIVE, 100);
WorkerMetrics alive2 = newWorkerMetrics(2, WorkerStatus.ALIVE, 200);
WorkerMetrics evicted1 = newWorkerMetrics(3, WorkerStatus.KILLED_DUE_TO_MEMORY_PRESSURE, 100);
WorkerMetrics evicted2 = newWorkerMetrics(4, WorkerStatus.KILLED_DUE_TO_MEMORY_PRESSURE, 200);
WorkerMetrics others1 = newWorkerMetrics(5, WorkerStatus.KILLED_UNKNOWN, 100);
WorkerMetrics others2 =
newWorkerMetrics(6, WorkerStatus.KILLED_DUE_TO_USER_EXEC_EXCEPTION, 200);
WorkerMetricsPublishComparator comparator = new WorkerMetricsPublishComparator();
// WorkerMetrics of the same status priority should be compared by their memory usage (higher
// gets prioritized).
assertThat(comparator.compare(alive1, alive2)).isEqualTo(1);
assertThat(comparator.compare(evicted1, evicted2)).isEqualTo(1);
assertThat(comparator.compare(others1, others2)).isEqualTo(1);
// WorkerMetrics should be first compared by their status priorities rather than their memory
// usage.
assertThat(comparator.compare(alive1, evicted2)).isEqualTo(-1);
assertThat(comparator.compare(evicted1, others2)).isEqualTo(-1);
assertThat(comparator.compare(others2, alive1)).isEqualTo(1);
}
@Test
public void testLimitWorkerMetricsToPublish() {
WorkerMetrics alive1 = newWorkerMetrics(1, WorkerStatus.ALIVE, 200);
WorkerMetrics alive2 = newWorkerMetrics(2, WorkerStatus.ALIVE, 100);
WorkerMetrics evicted3 = newWorkerMetrics(3, WorkerStatus.KILLED_DUE_TO_MEMORY_PRESSURE, 100);
WorkerMetrics evicted4 = newWorkerMetrics(4, WorkerStatus.KILLED_DUE_TO_MEMORY_PRESSURE, 200);
WorkerMetrics others5 = newWorkerMetrics(5, WorkerStatus.KILLED_UNKNOWN, 200);
WorkerMetrics others6 =
newWorkerMetrics(6, WorkerStatus.KILLED_DUE_TO_USER_EXEC_EXCEPTION, 100);
// Based on prioritization and then sorted by worker id.
assertThat(
WorkerProcessMetricsCollector.limitWorkerMetricsToPublish(
ImmutableList.of(alive1, alive2, evicted3, evicted4, others5, others6), 3))
.containsExactly(alive1, alive2, evicted4);
assertThat(
WorkerProcessMetricsCollector.limitWorkerMetricsToPublish(
ImmutableList.of(alive1, evicted4, others5, others6), 3))
.containsExactly(alive1, evicted4, others5);
// If under the limit, it should just report everything.
assertThat(
WorkerProcessMetricsCollector.limitWorkerMetricsToPublish(
ImmutableList.of(alive1, alive2, evicted4, others6), 10))
.containsExactly(alive1, alive2, evicted4, others6);
}
private WorkerMetrics newWorkerMetrics(int id, WorkerStatus status, int memoryInKb) {
return WorkerMetrics.newBuilder()
.addWorkerIds(id)
.setWorkerStatus(status)
.addWorkerStats(
WorkerMetrics.WorkerStats.newBuilder().setWorkerMemoryInKb(memoryInKb).build())
.build();
}
private WorkerProcessMetrics getWorkerProcessMetricsFromList(
int workerId, ImmutableList<WorkerProcessMetrics> metrics) {
return metrics.stream().filter(wm -> wm.getWorkerIds().contains(workerId)).findFirst().get();
}
private static final long DEFAULT_CLOCK_START_TIME = 0L;
private static final Instant DEFAULT_CLOCK_START_INSTANT =
Instant.ofEpochMilli(DEFAULT_CLOCK_START_TIME);
private static class ManualClock implements Clock {
private long currentTime = DEFAULT_CLOCK_START_TIME;
ManualClock() {}
@Override
public long nanoTime() {
throw new AssertionError("unexpected method call");
}
@Override
public long currentTimeMillis() {
return currentTime;
}
void setTime(long currentTime) {
this.currentTime = currentTime;
}
}
}