Windows,Bazel client: multithreaded file writing

The Bazel client on Windows now writes extracted
binaries to disk in parallel. On all other systems
it writes them serially (as before).

This change makes blaze.cc:ActuallyExtractData()
about 3x faster when using a HDD. (In previous
experiments I saw no speedup with multi-threaded
writing on machines with an SSD.)

The Windows-specific code uses the native
Threadpool API of Windows, creating a pool of at
least 8 and at most 16 threads. (This seems to be
a good balance between speed and thread count.)

The OS manages everything about the pool; Bazel
submits callbacks and the pool executes them
asynchronously.

blaze.cc:ActuallyExtractData() speed, before:
- Windows: 6.48s (avg) / 6.38s (median)
- Linux (Debian): 4.78s (avg) / 4.79s (median)

blaze.cc:ActuallyExtractData() speed, after:
- Windows (8-16 threads): 2.05s (avg) / 2.01s (md)
- Windows (1 thread): 5.77s (avg) / 5.74s (median)

See https://github.com/bazelbuild/bazel/issues/5444

Change-Id: I7211f3d28eb8b9837352c16ff8df0411d5a9ebe1

Closes #5600.

Change-Id: I7a74d62a563c92948a4dfa8ad5ac83eae018db10
PiperOrigin-RevId: 204891217
diff --git a/src/main/cpp/blaze.cc b/src/main/cpp/blaze.cc
index 09760fc..c84c6d4 100644
--- a/src/main/cpp/blaze.cc
+++ b/src/main/cpp/blaze.cc
@@ -878,8 +878,9 @@
 // A PureZipExtractorProcessor to extract the files from the blaze zip.
 class ExtractBlazeZipProcessor : public PureZipExtractorProcessor {
  public:
-  explicit ExtractBlazeZipProcessor(const string &embedded_binaries)
-      : embedded_binaries_(embedded_binaries) {}
+  explicit ExtractBlazeZipProcessor(const string &embedded_binaries,
+                                    blaze::embedded_binaries::Dumper *dumper)
+      : embedded_binaries_(embedded_binaries), dumper_(dumper) {}
 
   bool AcceptPure(const char *filename,
                   const devtools_ijar::u4 attr) const override {
@@ -892,29 +893,13 @@
 
   void Process(const char *filename, const devtools_ijar::u4 attr,
                const devtools_ijar::u1 *data, const size_t size) override {
-    string path = blaze_util::JoinPath(embedded_binaries_, filename);
-    string dirname = blaze_util::Dirname(path);
-    // Performance optimization: memoize the paths we already created a
-    // directory for, to spare a stat in attempting to recreate an already
-    // existing directory. This optimization alone shaves off seconds from the
-    // extraction time on Windows.
-    if (created_directories_.insert(dirname).second) {
-      if (!blaze_util::MakeDirectories(dirname, 0777)) {
-        BAZEL_DIE(blaze_exit_code::INTERNAL_ERROR)
-            << "couldn't create '" << path << "': " << GetLastErrorString();
-      }
-    }
-
-    if (!blaze_util::WriteFile(data, size, path, 0755)) {
-      BAZEL_DIE(blaze_exit_code::LOCAL_ENVIRONMENTAL_ERROR)
-          << "Failed to write zipped file '" << path
-          << "': " << GetLastErrorString();
-    }
+    dumper_->Dump(data, size,
+                  blaze_util::JoinPath(embedded_binaries_, filename));
   }
 
  private:
   const string embedded_binaries_;
-  set<string> created_directories_;
+  blaze::embedded_binaries::Dumper *dumper_;
 };
 
 // Actually extracts the embedded data files into the tree whose root
@@ -923,7 +908,16 @@
                                 const string &embedded_binaries) {
   std::string install_md5;
   GetInstallKeyFileProcessor install_key_processor(&install_md5);
-  ExtractBlazeZipProcessor extract_blaze_processor(embedded_binaries);
+
+  std::string error;
+  std::unique_ptr<blaze::embedded_binaries::Dumper> dumper(
+      blaze::embedded_binaries::Create(&error));
+  if (dumper == nullptr) {
+    BAZEL_DIE(blaze_exit_code::LOCAL_ENVIRONMENTAL_ERROR) << error;
+  }
+  ExtractBlazeZipProcessor extract_blaze_processor(embedded_binaries,
+                                                   dumper.get());
+
   CompoundZipProcessor processor({&extract_blaze_processor,
                                   &install_key_processor});
   if (!blaze_util::MakeDirectories(embedded_binaries, 0777)) {
@@ -948,6 +942,11 @@
         << " as a zip file: " << extractor->GetError();
   }
 
+  if (!dumper->Finish(&error)) {
+    BAZEL_DIE(blaze_exit_code::LOCAL_ENVIRONMENTAL_ERROR)
+        << "Failed to extract embedded binaries: " << error;
+  }
+
   if (install_md5 != globals->install_md5) {
     BAZEL_DIE(blaze_exit_code::LOCAL_ENVIRONMENTAL_ERROR)
         << "The " << globals->options->product_name << " binary at " << argv0
diff --git a/src/main/cpp/blaze_util_platform.h b/src/main/cpp/blaze_util_platform.h
index 29596df..24a28eb 100644
--- a/src/main/cpp/blaze_util_platform.h
+++ b/src/main/cpp/blaze_util_platform.h
@@ -25,6 +25,50 @@
 
 namespace blaze {
 
+namespace embedded_binaries {
+
+// Dumps embedded binaries that were extracted from the Bazel zip to disk.
+// The platform-specific implementations may use multi-threaded I/O.
+class Dumper {
+ public:
+  // Requests to write the `data` of `size` bytes to disk under `path`.
+  // The actual writing may happen asynchronously.
+  // `path` must be an absolute path. All of its parent directories will be
+  // created.
+  // The caller retains ownership of `data` and may release it immediately after
+  // this method returns.
+  // Callers may call this method repeatedly, but only from the same thread
+  // (this method is not thread-safe).
+  // If writing fails, this method sets a flag in the `Dumper`, and `Finish`
+  // will return false. Subsequent `Dump` calls will have no effect.
+  virtual void Dump(const void* data, const size_t size,
+                    const std::string& path) = 0;
+
+  // Finishes dumping data.
+  //
+  // This method may block in case the Dumper is asynchronous and some async
+  // writes are still in progress.
+  // Subsequent `Dump` calls after this method have no effect.
+  //
+  // Returns true if there were no errors in any of the `Dump` calls.
+  // Returns false if any of the `Dump` calls failed, and if `error` is not
+  // null then puts an error message in `error`.
+  virtual bool Finish(std::string* error) = 0;
+
+  // Destructor. Subclasses should make sure it calls `Finish(nullptr)`.
+  virtual ~Dumper() {}
+
+ protected:
+  Dumper() {}
+};
+
+// Creates a new Dumper. The caller takes ownership of the returned object.
+// Returns nullptr upon failure and puts an error message in `error` (if `error`
+// is not nullptr).
+Dumper* Create(std::string* error = nullptr);
+
+}  // namespace embedded_binaries
+
 struct GlobalVariables;
 
 class SignalHandler {
diff --git a/src/main/cpp/blaze_util_posix.cc b/src/main/cpp/blaze_util_posix.cc
index 0a3daaa..79ebbb0 100644
--- a/src/main/cpp/blaze_util_posix.cc
+++ b/src/main/cpp/blaze_util_posix.cc
@@ -36,6 +36,8 @@
 
 #include <cassert>
 #include <cinttypes>
+#include <set>
+#include <string>
 
 #include "src/main/cpp/blaze_util.h"
 #include "src/main/cpp/global_variables.h"
@@ -54,9 +56,69 @@
 using blaze_exit_code::INTERNAL_ERROR;
 using blaze_util::GetLastErrorString;
 
+using std::set;
 using std::string;
 using std::vector;
 
+namespace embedded_binaries {
+
+class PosixDumper : public Dumper {
+ public:
+  static PosixDumper* Create(string* error);
+  ~PosixDumper() { Finish(nullptr); }
+  void Dump(const void* data, const size_t size, const string& path) override;
+  bool Finish(string* error) override;
+
+ private:
+  PosixDumper() : was_error_(false) {}
+
+  set<string> dir_cache_;
+  string error_msg_;
+  bool was_error_;
+};
+
+Dumper* Create(string* error) { return PosixDumper::Create(error); }
+
+PosixDumper* PosixDumper::Create(string* error) { return new PosixDumper(); }
+
+void PosixDumper::Dump(const void* data, const size_t size,
+                       const string& path) {
+  if (was_error_) {
+    return;
+  }
+
+  string dirname = blaze_util::Dirname(path);
+  // Performance optimization: memoize the paths we already created a
+  // directory for, to spare a stat in attempting to recreate an already
+  // existing directory.
+  if (dir_cache_.insert(dirname).second) {
+    if (!blaze_util::MakeDirectories(dirname, 0777)) {
+      was_error_ = true;
+      string msg = GetLastErrorString();
+      error_msg_ = string("couldn't create '") + path + "': " + msg;
+    }
+  }
+
+  if (was_error_) {
+    return;
+  }
+
+  if (!blaze_util::WriteFile(data, size, path, 0755)) {
+    was_error_ = true;
+    string msg = GetLastErrorString();
+    error_msg_ = string("Failed to write zipped file '") + path + "': " + msg;
+  }
+}
+
+bool PosixDumper::Finish(string* error) {
+  if (was_error_ && error) {
+    *error = error_msg_;
+  }
+  return !was_error_;
+}
+
+}  // namespace embedded_binaries
+
 SignalHandler SignalHandler::INSTANCE;
 
 // The number of the last received signal that should cause the client
diff --git a/src/main/cpp/blaze_util_windows.cc b/src/main/cpp/blaze_util_windows.cc
index 3751a1d..19ce3af 100644
--- a/src/main/cpp/blaze_util_windows.cc
+++ b/src/main/cpp/blaze_util_windows.cc
@@ -27,10 +27,13 @@
 #include <shlobj.h>        // SHGetKnownFolderPath
 
 #include <algorithm>
+#include <atomic>
 #include <cstdio>
 #include <cstdlib>
+#include <mutex>  // NOLINT
+#include <set>
 #include <sstream>
-#include <thread>  // NOLINT (to slience Google-internal linter)
+#include <thread>       // NOLINT (to silence Google-internal linter)
 #include <type_traits>  // static_assert
 #include <vector>
 
@@ -76,6 +79,199 @@
 using std::unique_ptr;
 using std::wstring;
 
+namespace embedded_binaries {
+
+class WindowsDumper : public Dumper {
+ public:
+  static WindowsDumper* Create(string* error);
+  ~WindowsDumper() { Finish(nullptr); }
+  void Dump(const void* data, const size_t size, const string& path) override;
+  bool Finish(string* error) override;
+
+ private:
+  WindowsDumper()
+      : threadpool_(NULL), cleanup_group_(NULL), was_error_(false) {}
+
+  PTP_POOL threadpool_;
+  PTP_CLEANUP_GROUP cleanup_group_;
+  TP_CALLBACK_ENVIRON threadpool_env_;
+  std::mutex dir_cache_lock_;
+  std::set<string> dir_cache_;
+  std::atomic_bool was_error_;
+  string error_msg_;
+};
+
+namespace {
+
+class DumpContext {
+ public:
+  DumpContext(unique_ptr<uint8_t[]> data, const size_t size, const string path,
+              std::mutex* dir_cache_lock, std::set<string>* dir_cache,
+              std::atomic_bool* was_error, string* error_msg);
+  void Run();
+
+ private:
+  void MaybeSignalError(const string& msg);
+
+  unique_ptr<uint8_t[]> data_;
+  const size_t size_;
+  const string path_;
+  std::mutex* dir_cache_lock_;
+  std::set<string>* dir_cache_;
+  std::atomic_bool* was_error_;
+  string* error_msg_;
+};
+
+VOID CALLBACK WorkCallback(_Inout_ PTP_CALLBACK_INSTANCE Instance,
+                           _Inout_opt_ PVOID Context, _Inout_ PTP_WORK Work);
+
+}  // namespace
+
+Dumper* Create(string* error) { return WindowsDumper::Create(error); }
+
+WindowsDumper* WindowsDumper::Create(string* error) {
+  unique_ptr<WindowsDumper> result(new WindowsDumper());
+
+  result->threadpool_ = CreateThreadpool(NULL);
+  if (result->threadpool_ == NULL) {
+    if (error) {
+      string msg = GetLastErrorString();
+      *error = "CreateThreadpool failed: " + msg;
+    }
+    return nullptr;
+  }
+
+  result->cleanup_group_ = CreateThreadpoolCleanupGroup();
+  if (result->cleanup_group_ == NULL) {
+    string msg = GetLastErrorString();
+    CloseThreadpool(result->threadpool_);
+    if (error) {
+      string msg = GetLastErrorString();
+      *error = "CreateThreadpoolCleanupGroup failed: " + msg;
+    }
+    return nullptr;
+  }
+
+  // I (@laszlocsomor) experimented with different thread counts and found that
+  // 8 threads provide a significant advantage over 1 thread, but adding more
+  // threads provides only marginal speedup.
+  SetThreadpoolThreadMaximum(result->threadpool_, 16);
+  SetThreadpoolThreadMinimum(result->threadpool_, 8);
+
+  InitializeThreadpoolEnvironment(&result->threadpool_env_);
+  SetThreadpoolCallbackPool(&result->threadpool_env_, result->threadpool_);
+  SetThreadpoolCallbackCleanupGroup(&result->threadpool_env_,
+                                    result->cleanup_group_, NULL);
+
+  return result.release();  // release pointer ownership
+}
+
+void WindowsDumper::Dump(const void* data, const size_t size,
+                         const string& path) {
+  if (was_error_) {
+    return;
+  }
+
+  unique_ptr<uint8_t[]> data_copy(new uint8_t[size]);
+  memcpy(data_copy.get(), data, size);
+  unique_ptr<DumpContext> ctx(new DumpContext(std::move(data_copy), size, path,
+                                              &dir_cache_lock_, &dir_cache_,
+                                              &was_error_, &error_msg_));
+  PTP_WORK w = CreateThreadpoolWork(WorkCallback, ctx.get(), &threadpool_env_);
+  if (w == NULL) {
+    string err = GetLastErrorString();
+    if (!was_error_.exchange(true)) {
+      // Benign race condition: though we use no locks to access `error_msg_`,
+      // only one thread may ever flip `was_error_` from false to true and enter
+      // the body of this if-clause. Since `was_error_` is the same object as
+      // used by all other threads trying to write to `error_msg_` (see
+      // DumpContext::MaybeSignalError), using it provides adequate mutual
+      // exclusion to write `error_msg_`.
+      error_msg_ = string("WindowsDumper::Dump() couldn't submit work: ") + err;
+    }
+  } else {
+    ctx.release();  // release pointer ownership
+    SubmitThreadpoolWork(w);
+  }
+}
+
+bool WindowsDumper::Finish(string* error) {
+  if (threadpool_ == NULL) {
+    return true;
+  }
+  CloseThreadpoolCleanupGroupMembers(cleanup_group_, FALSE, NULL);
+  CloseThreadpoolCleanupGroup(cleanup_group_);
+  CloseThreadpool(threadpool_);
+  threadpool_ = NULL;
+  cleanup_group_ = NULL;
+  if (was_error_ && error) {
+    // No race condition reading `error_msg_`: all worker threads terminated
+    // by now.
+    *error = error_msg_;
+  }
+  return !was_error_;
+}
+
+namespace {
+
+DumpContext::DumpContext(unique_ptr<uint8_t[]> data, const size_t size,
+                         const string path, std::mutex* dir_cache_lock,
+                         std::set<string>* dir_cache,
+                         std::atomic_bool* was_error, string* error_msg)
+    : data_(std::move(data)),
+      size_(size),
+      path_(path),
+      dir_cache_lock_(dir_cache_lock),
+      dir_cache_(dir_cache),
+      was_error_(was_error),
+      error_msg_(error_msg) {}
+
+void DumpContext::Run() {
+  string dirname = blaze_util::Dirname(path_);
+
+  bool success = true;
+  // Performance optimization: memoize the paths we already created a
+  // directory for, to spare a stat in attempting to recreate an already
+  // existing directory. This optimization alone shaves off seconds from the
+  // extraction time on Windows.
+  {
+    std::lock_guard<std::mutex> guard(*dir_cache_lock_);
+    if (dir_cache_->insert(dirname).second) {
+      success = blaze_util::MakeDirectories(dirname, 0777);
+    }
+  }
+
+  if (!success) {
+    MaybeSignalError(string("Couldn't create directory '") + dirname + "'");
+    return;
+  }
+
+  if (!blaze_util::WriteFile(data_.get(), size_, path_, 0755)) {
+    MaybeSignalError(string("Failed to write zipped file '") + path_ + "'");
+  }
+}
+
+void DumpContext::MaybeSignalError(const string& msg) {
+  if (!was_error_->exchange(true)) {
+    // Benign race condition: though we use no locks to access `error_msg_`,
+    // only one thread may ever flip `was_error_` from false to true and enter
+    // the body of this if-clause. Since `was_error_` is the same object as used
+    // by all other threads and by WindowsDumper::Dump(), using it provides
+    // adequate mutual exclusion to write `error_msg_`.
+    *error_msg_ = msg;
+  }
+}
+
+VOID CALLBACK WorkCallback(_Inout_ PTP_CALLBACK_INSTANCE Instance,
+                           _Inout_opt_ PVOID Context, _Inout_ PTP_WORK Work) {
+  unique_ptr<DumpContext> ctx(reinterpret_cast<DumpContext*>(Context));
+  ctx->Run();
+}
+
+}  // namespace
+
+}  // namespace embedded_binaries
+
 SignalHandler SignalHandler::INSTANCE;
 
 class WindowsClock {