Introduce an Extrema aggregator.

RELNOTES: None
PiperOrigin-RevId: 187370833
diff --git a/src/main/java/com/google/devtools/build/lib/collect/Extrema.java b/src/main/java/com/google/devtools/build/lib/collect/Extrema.java
new file mode 100644
index 0000000..6f0b9cf
--- /dev/null
+++ b/src/main/java/com/google/devtools/build/lib/collect/Extrema.java
@@ -0,0 +1,95 @@
+// Copyright 2018 The Bazel Authors. All rights reserved.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//    http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+package com.google.devtools.build.lib.collect;
+
+import com.google.common.collect.ImmutableList;
+import java.util.Comparator;
+import java.util.PriorityQueue;
+
+/**
+ * A stream aggregator that, given a {@code k}, aggregates a sequence of elements into the {@code k}
+ * most extreme.
+ */
+public class Extrema<T extends Comparable<T>> {
+  private final int k;
+  private final Comparator<T> extremaComparator;
+  private final PriorityQueue<T> priorityQueue;
+
+  /**
+   * Creates an {@link Extrema} that aggregates a sequence of elements into the {@code k} smallest.
+   */
+  public static <T extends Comparable<T>> Extrema<T> min(int k) {
+    return new Extrema<>(k, Comparator.<T>naturalOrder());
+  }
+
+  /**
+   * Creates an {@link Extrema} that aggregates a sequence of elements into the {@code k} largest.
+   */
+  public static <T extends Comparable<T>> Extrema<T> max(int k) {
+    return new Extrema<>(k, Comparator.<T>naturalOrder().reversed());
+  }
+
+  /**
+   * @param k the number of extreme elements to compute
+   * @param extremaComparator a comparator such that {@code extremaComparator(a, b) < 0} iff
+   *        {@code a} is more extreme than {@code b}
+   */
+  private Extrema(int k, Comparator<T> extremaComparator) {
+    this.k = k;
+    this.extremaComparator = extremaComparator;
+    this.priorityQueue = new PriorityQueue<>(
+        /*initialCapacity=*/ k,
+        // Our implementation strategy is to keep a priority queue of the k most extreme elements
+        // encountered, ordered backwards; this way we have constant-time access to the least
+        // extreme among these elements.
+        extremaComparator.reversed());
+  }
+
+  /**
+   * Aggregates the given element.
+   *
+   * <p>See {@link #getExtremeElements()}.
+   */
+  public void aggregate(T element) {
+    if (priorityQueue.size() < k) {
+      priorityQueue.add(element);
+    } else {
+      if (extremaComparator.compare(element, priorityQueue.peek()) < 0) {
+        // Suppose the least extreme of the current k most extreme elements is e. If the new element
+        // is more extreme than e, then (i) it must be among the new k most extreme among the (2) e
+        // must not be.
+        priorityQueue.remove();
+        priorityQueue.add(element);
+      }
+    }
+  }
+
+  /**
+   * For an {@link Extrema} created with {@code k} and with {@code n} calls to {@link #aggregate}
+   * since the most recent call to {@link #clear}, returns the min(k, n) most extreme elements
+   * {@link #aggregate}'ed since the most recent call to {@link #clear}.
+   */
+  public ImmutableList<T> getExtremeElements() {
+    return ImmutableList.sortedCopyOf(extremaComparator, priorityQueue);
+  }
+
+  /**
+   * Disregards all the elements {@link #aggregate}'ed already.
+   *
+   * <p>See {@link #getExtremeElements()}.
+   */
+  public void clear() {
+    priorityQueue.clear();
+  }
+}
diff --git a/src/main/java/com/google/devtools/build/lib/packages/Package.java b/src/main/java/com/google/devtools/build/lib/packages/Package.java
index 0430700..7a8b121 100644
--- a/src/main/java/com/google/devtools/build/lib/packages/Package.java
+++ b/src/main/java/com/google/devtools/build/lib/packages/Package.java
@@ -721,7 +721,7 @@
    */
   public static class Builder {
 
-    public static interface Helper {
+    public interface Helper {
       /**
        * Returns a fresh {@link Package} instance that a {@link Builder} will internally mutate
        * during package loading. Called by {@link PackageFactory}.
@@ -730,10 +730,13 @@
 
       /**
        * Called after {@link com.google.devtools.build.lib.skyframe.PackageFunction} is completely
-       * done loading the given {@link Package}. {@code skylarkSemantics} are the semantics used to
-       * evaluate the build.
+       * done loading the given {@link Package}.
+       *
+       * @param pkg the loaded {@link Package}
+       * @param skylarkSemantics are the semantics used to load the package
+       * @param loadTimeMs the wall time, in ms, that it took to load the package
        */
-      void onLoadingComplete(Package pkg, SkylarkSemantics skylarkSemantics);
+      void onLoadingComplete(Package pkg, SkylarkSemantics skylarkSemantics, long loadTimeMs);
     }
 
     /** {@link Helper} that simply calls the {@link Package} constructor. */
@@ -749,7 +752,8 @@
       }
 
       @Override
-      public void onLoadingComplete(Package pkg, SkylarkSemantics skylarkSemantics) {
+      public void onLoadingComplete(
+          Package pkg, SkylarkSemantics skylarkSemantics, long loadTimeMs) {
       }
     }
 
diff --git a/src/main/java/com/google/devtools/build/lib/packages/PackageFactory.java b/src/main/java/com/google/devtools/build/lib/packages/PackageFactory.java
index daf3dfe..3f3ad09 100644
--- a/src/main/java/com/google/devtools/build/lib/packages/PackageFactory.java
+++ b/src/main/java/com/google/devtools/build/lib/packages/PackageFactory.java
@@ -390,7 +390,7 @@
    * Constructs a {@code PackageFactory} instance with a specific glob path translator
    * and rule factory.
    *
-   * <p>Only intended to be called by BlazeRuntime or {@link FactoryForTesting#create}.
+   * <p>Only intended to be called by BlazeRuntime or {@link BuilderForTesting#build}.
    *
    * <p>Do not call this constructor directly in tests; please use
    * TestConstants#PACKAGE_FACTORY_BUILDER_FACTORY_FOR_TESTING instead.
@@ -1617,8 +1617,9 @@
    * Called by a caller of {@link #createPackageFromAst} after this caller has fully
    * loaded the package.
    */
-  public void afterDoneLoadingPackage(Package pkg, SkylarkSemantics skylarkSemantics) {
-    packageBuilderHelper.onLoadingComplete(pkg, skylarkSemantics);
+  public void afterDoneLoadingPackage(
+      Package pkg, SkylarkSemantics skylarkSemantics, long loadTimeNanos) {
+    packageBuilderHelper.onLoadingComplete(pkg, skylarkSemantics, loadTimeNanos);
   }
 
   /**
diff --git a/src/main/java/com/google/devtools/build/lib/profiler/BUILD b/src/main/java/com/google/devtools/build/lib/profiler/BUILD
index bb1231e..e0a8dfb 100644
--- a/src/main/java/com/google/devtools/build/lib/profiler/BUILD
+++ b/src/main/java/com/google/devtools/build/lib/profiler/BUILD
@@ -16,6 +16,7 @@
         "//src/main/java/com/google/devtools/build/lib:base-util",
         "//src/main/java/com/google/devtools/build/lib:os_util",
         "//src/main/java/com/google/devtools/build/lib/clock",
+        "//src/main/java/com/google/devtools/build/lib/collect",
         "//src/main/java/com/google/devtools/build/lib/concurrent",
         "//src/main/java/com/google/devtools/build/lib/shell",
         "//src/main/java/com/google/devtools/common/options",
diff --git a/src/main/java/com/google/devtools/build/lib/profiler/Profiler.java b/src/main/java/com/google/devtools/build/lib/profiler/Profiler.java
index d453967..b58c21d 100644
--- a/src/main/java/com/google/devtools/build/lib/profiler/Profiler.java
+++ b/src/main/java/com/google/devtools/build/lib/profiler/Profiler.java
@@ -20,6 +20,7 @@
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.Iterables;
 import com.google.devtools.build.lib.clock.Clock;
+import com.google.devtools.build.lib.collect.Extrema;
 import com.google.devtools.build.lib.concurrent.ThreadSafety.ThreadCompatible;
 import com.google.devtools.build.lib.concurrent.ThreadSafety.ThreadSafe;
 import com.google.devtools.build.lib.profiler.PredicateBasedStatRecorder.RecorderAndPredicate;
@@ -36,7 +37,6 @@
 import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
-import java.util.PriorityQueue;
 import java.util.Queue;
 import java.util.Timer;
 import java.util.TimerTask;
@@ -332,51 +332,40 @@
   /**
    * Aggregator class that keeps track of the slowest tasks of the specified type.
    *
-   * <p><code>priorityQueues</p> is sharded so that all threads need not compete for the same
-   * lock if they do the same operation at the same time. Access to the individual queues is
-   * synchronized on the queue objects themselves.
+   * <p><code>extremaAggregators</p> is sharded so that all threads need not compete for the same
+   * lock if they do the same operation at the same time. Access to an individual {@link Extrema}
+   * is synchronized on the {@link Extrema} instance itself.
    */
   private static final class SlowestTaskAggregator {
     private static final int SHARDS = 16;
     private final int size;
 
     @SuppressWarnings({"unchecked", "rawtypes"})
-    private final PriorityQueue<SlowTask>[] priorityQueues = new PriorityQueue[SHARDS];
+    private final Extrema<SlowTask>[] extremaAggregators = new Extrema[SHARDS];
 
     SlowestTaskAggregator(int size) {
       this.size = size;
 
       for (int i = 0; i < SHARDS; i++) {
-        priorityQueues[i] = new PriorityQueue<>(size + 1);
+        extremaAggregators[i] = Extrema.max(size);
       }
     }
 
     // @ThreadSafe
     void add(TaskData taskData) {
-      PriorityQueue<SlowTask> queue =
-          priorityQueues[(int) (Thread.currentThread().getId() % SHARDS)];
-      synchronized (queue) {
-        if (queue.size() == size) {
-          // Optimization: check if we are faster than the fastest element. If we are, we would
-          // be the ones to fall off the end of the queue, therefore, we can safely return early.
-          if (queue.peek().getDurationNanos() > taskData.duration) {
-            return;
-          }
-
-          queue.add(new SlowTask(taskData));
-          queue.remove();
-        } else {
-          queue.add(new SlowTask(taskData));
-        }
+      Extrema<SlowTask> extrema =
+          extremaAggregators[(int) (Thread.currentThread().getId() % SHARDS)];
+      synchronized (extrema) {
+        extrema.aggregate(new SlowTask(taskData));
       }
     }
 
     // @ThreadSafe
     void clear() {
       for (int i = 0; i < SHARDS; i++) {
-        PriorityQueue<SlowTask> queue = priorityQueues[i];
-        synchronized (queue) {
-          queue.clear();
+        Extrema<SlowTask> extrema = extremaAggregators[i];
+        synchronized (extrema) {
+          extrema.clear();
         }
       }
     }
@@ -384,19 +373,16 @@
     // @ThreadSafe
     Iterable<SlowTask> getSlowestTasks() {
       // This is slow, but since it only happens during the end of the invocation, it's OK
-      PriorityQueue<SlowTask> merged = new PriorityQueue<>(size * SHARDS);
+      Extrema mergedExtrema = Extrema.max(size * SHARDS);
       for (int i = 0; i < SHARDS; i++) {
-        PriorityQueue<SlowTask> queue = priorityQueues[i];
-        synchronized (queue) {
-          merged.addAll(queue);
+        Extrema<SlowTask> extrema = extremaAggregators[i];
+        synchronized (extrema) {
+          for (SlowTask task : extrema.getExtremeElements()) {
+            mergedExtrema.aggregate(task);
+          }
         }
       }
-
-      while (merged.size() > size) {
-        merged.remove();
-      }
-
-      return merged;
+      return mergedExtrema.getExtremeElements();
     }
   }
 
diff --git a/src/main/java/com/google/devtools/build/lib/skyframe/PackageFunction.java b/src/main/java/com/google/devtools/build/lib/skyframe/PackageFunction.java
index af54d08..bfc2a35 100644
--- a/src/main/java/com/google/devtools/build/lib/skyframe/PackageFunction.java
+++ b/src/main/java/com/google/devtools/build/lib/skyframe/PackageFunction.java
@@ -25,6 +25,7 @@
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
 import com.google.common.collect.Sets;
+import com.google.devtools.build.lib.clock.BlazeClock;
 import com.google.devtools.build.lib.cmdline.Label;
 import com.google.devtools.build.lib.cmdline.LabelSyntaxException;
 import com.google.devtools.build.lib.cmdline.PackageIdentifier;
@@ -472,7 +473,11 @@
     }
 
     if (packageFactory != null) {
-      packageFactory.afterDoneLoadingPackage(pkg, skylarkSemantics);
+      packageFactory.afterDoneLoadingPackage(
+          pkg,
+          skylarkSemantics,
+          // This is a lie.
+          /*loadTimeNanos=*/0L);
     }
     return new PackageValue(pkg);
   }
@@ -576,6 +581,7 @@
     List<Statement> preludeStatements =
         astLookupValue.lookupSuccessful()
             ? astLookupValue.getAST().getStatements() : ImmutableList.<Statement>of();
+    long startTimeNanos = BlazeClock.nanoTime();
     BuilderAndGlobDeps packageBuilderAndGlobDeps =
         loadPackage(
             workspaceName,
@@ -588,6 +594,7 @@
             preludeStatements,
             packageLookupValue.getRoot(),
             env);
+    long loadTimeNanos = Math.max(BlazeClock.nanoTime() - startTimeNanos, 0L);
     if (packageBuilderAndGlobDeps == null) {
       return null;
     }
@@ -642,7 +649,7 @@
     // We know this SkyFunction will not be called again, so we can remove the cache entry.
     packageFunctionCache.invalidate(packageId);
 
-    packageFactory.afterDoneLoadingPackage(pkg, skylarkSemantics);
+    packageFactory.afterDoneLoadingPackage(pkg, skylarkSemantics, loadTimeNanos);
     return new PackageValue(pkg);
   }