Windows, test wrapper: implement IFStream

Implement a buffered input stream with look-ahead
support.

This class is necessary to support CDATA-encoding
large test logs that would be inconvenient to read
into memory.

Unfortunately STL doesn't seem to offer a buffered
file stream object that takes a HANDLE or FILE*.
The std::ifstream, which seems to implement a
buffered input stream, can only be constructed
using an octet-stream path, which makes it
impossible to open paths by WCHAR strings on
Windows, while long paths must be opened using
WCHAR strings.

See https://github.com/bazelbuild/bazel/issues/5508

Closes #7214.

PiperOrigin-RevId: 232630569
diff --git a/tools/test/windows/tw.cc b/tools/test/windows/tw.cc
index 6ecf855..7ac6768 100644
--- a/tools/test/windows/tw.cc
+++ b/tools/test/windows/tw.cc
@@ -100,6 +100,59 @@
   bazel::windows::AutoHandle output2_;
 };
 
+// Buffered input stream (based on a Windows HANDLE) with peek-ahead support.
+//
+// This class uses two consecutive "pages" where it buffers data from the
+// underlying HANDLE (wrapped in an AutoHandle). Both pages are always loaded
+// with data until there's no more data to read.
+//
+// The "active" page is the one where the read cursor is pointing. The other
+// page is the next one to be read once the client moves the read cursor beyond
+// the end of the active page.
+//
+// The client advances the read cursor with Advance(). When the cursor reaches
+// the end of the active page, the other page becomes the active one (whose data
+// is already buffered), and the old active page is loaded with new data from
+// the underlying file.
+class IFStreamImpl : IFStream {
+ public:
+  // Creates a new IFStream.
+  //
+  // If successful, then takes ownership of the HANDLE in 'handle', and returns
+  // a new IFStream pointer. Otherwise leaves 'handle' alone and returns
+  // nullptr.
+  static IFStream* Create(bazel::windows::AutoHandle* handle,
+                          DWORD max_page_size = 0x100000 /* 1 MB */);
+
+  bool Get(uint8_t* result) const override;
+  bool Advance() override;
+
+ protected:
+  bool PeekN(DWORD n, uint8_t* result) const override;
+
+ private:
+  bazel::windows::AutoHandle handle_;
+  const std::unique_ptr<uint8_t[]> data_;
+  const DWORD max_page_size_;
+  DWORD page1_size_;
+  DWORD page2_size_;
+  DWORD page_end_;
+  DWORD read_pos_;
+
+  IFStreamImpl(bazel::windows::AutoHandle* handle,
+               std::unique_ptr<uint8_t[]>&& data, DWORD data_size,
+               DWORD max_page_size)
+      : handle_(handle),
+        data_(std::move(data)),
+        max_page_size_(max_page_size),
+        page1_size_(data_size > max_page_size ? max_page_size : data_size),
+        page2_size_(data_size > max_page_size ? data_size - max_page_size : 0),
+        read_pos_(0),
+        page_end_(page1_size_) {}
+
+  bool Page1Active() const { return read_pos_ < max_page_size_; }
+};
+
 // A lightweight path abstraction that stores a Unicode Windows path.
 //
 // The class allows extracting the underlying path as a (immutable) string so
@@ -1719,6 +1772,100 @@
   return result;
 }
 
+IFStream* IFStreamImpl::Create(bazel::windows::AutoHandle* handle,
+                               DWORD max_page_size) {
+  std::unique_ptr<uint8_t[]> data(new uint8_t[max_page_size * 2]);
+  DWORD read;
+  if (!ReadFile(*handle, data.get(), max_page_size * 2, &read, NULL)) {
+    DWORD err = GetLastError();
+    if (err == ERROR_BROKEN_PIPE) {
+      read = 0;
+    } else {
+      LogErrorWithValue(__LINE__, "Failed to read from file", err);
+      return nullptr;
+    }
+  }
+  return new IFStreamImpl(handle, std::move(data), read, max_page_size);
+}
+
+bool IFStreamImpl::Get(uint8_t* result) const {
+  if (read_pos_ < page_end_) {
+    *result = data_[read_pos_];
+    return true;
+  } else {
+    return false;
+  }
+}
+
+bool IFStreamImpl::Advance() {
+  if (read_pos_ + 1 < page_end_) {
+    read_pos_++;
+    return true;
+  }
+  const bool page1_was_active = Page1Active();
+  // The new page should have already been loaded when we started reading the
+  // current one (or it was filled by the Create method). Its size should only
+  // be zero if we reached EOF.
+  if ((page1_was_active && page2_size_ == 0) ||
+      (!page1_was_active && page1_size_ == 0)) {
+    return false;
+  }
+  // Overwrite the *active* page, because read_pos_ is about to move out of it
+  // and the current inactive page will be the new active one.
+  if (!ReadFile(handle_,
+                page1_was_active ? data_.get() : (data_.get() + max_page_size_),
+                max_page_size_, page1_was_active ? &page1_size_ : &page2_size_,
+                NULL)) {
+    DWORD err = GetLastError();
+    if (err == ERROR_BROKEN_PIPE) {
+      // The stream is reading from a pipe, and there's no more data.
+      if (page1_was_active) {
+        page1_size_ = 0;
+      } else {
+        page2_size_ = 0;
+      }
+    } else {
+      LogErrorWithValue(__LINE__, "Failed to read from file", err);
+      return false;
+    }
+  }
+  page_end_ = page1_was_active ? max_page_size_ + page2_size_ : page1_size_;
+  read_pos_ = page1_was_active ? max_page_size_ : 0;
+  return true;
+}
+
+bool IFStreamImpl::PeekN(DWORD n, uint8_t* result) const {
+  if (n > 3) {
+    // We only need to support peeking at up to 3 bytes. The theoretical upper
+    // limit is max_page_size_ * 2 - 1, because the buffer can hold at most
+    // max_page_size_ * 2 bytes of data and peeking starts at the next byte.
+    return false;
+  }
+
+  if (page_end_ - read_pos_ > n) {
+    // The current page has enough data we can peek at.
+    for (DWORD i = 0; i < n; ++i) {
+      result[i] = data_[read_pos_ + 1 + i];
+    }
+    return true;
+  }
+  DWORD required_from_next_page = n - (page_end_ - 1 - read_pos_);
+  // Check that the next page has enough data.
+  if ((Page1Active() && page2_size_ < required_from_next_page) ||
+      (!Page1Active() && page1_size_ < required_from_next_page)) {
+    // Pages are loaded eagerly by Advance(). The only way the next page's size
+    // can be zero is if we reached EOF.
+    return false;
+  }
+  for (DWORD i = 0, pos = read_pos_ + 1; i < n; ++i, ++pos) {
+    if (pos == page_end_) {
+      pos = Page1Active() ? max_page_size_ : 0;
+    }
+    result[i] = data_[pos];
+  }
+  return true;
+}
+
 }  // namespace
 
 void ZipEntryPaths::Create(const std::string& root,
@@ -1902,6 +2049,11 @@
          CdataEscapeAndAppend(input_path, output);
 }
 
+IFStream* TestOnly_CreateIFStream(bazel::windows::AutoHandle* handle,
+                                  DWORD page_size) {
+  return IFStreamImpl::Create(handle, page_size);
+}
+
 }  // namespace testing
 }  // namespace test_wrapper
 }  // namespace tools
diff --git a/tools/test/windows/tw.h b/tools/test/windows/tw.h
index 05f4a57..1b58ba3 100644
--- a/tools/test/windows/tw.h
+++ b/tools/test/windows/tw.h
@@ -111,6 +111,43 @@
   Tee& operator=(const Tee&) = delete;
 };
 
+// Buffered input stream (based on a HANDLE) with peek-ahead support.
+class IFStream {
+ public:
+  virtual ~IFStream() {}
+
+  // Gets the current byte under the read cursor.
+  // Returns true upon success, returns false if there's no more data to read.
+  virtual bool Get(uint8_t* result) const = 0;
+
+  // Advances the read cursor one byte ahead. May fetch data from the underlying
+  // HANDLE.
+  // Returns true if the cursor could be moved. Returns false if EOF was reached
+  // or if there was an I/O error.
+  virtual bool Advance() = 0;
+
+  // Peeks at the next byte after the read cursor. Returns true if there's at
+  // least one more byte in the stream.
+  bool Peek1(uint8_t* result) const { return PeekN(1, result); }
+
+  // Peeks at the next two bytes after the read cursor. Returns true if there
+  // are at least two more byte in the stream.
+  bool Peek2(uint8_t* result) const { return PeekN(2, result); }
+
+  // Peeks at the next three bytes after the read cursor. Returns true if there
+  // are at least three more byte in the stream.
+  bool Peek3(uint8_t* result) const { return PeekN(3, result); }
+
+ protected:
+  IFStream() {}
+  IFStream(const IFStream&) = delete;
+  IFStream& operator=(const IFStream&) = delete;
+
+  // Peeks ahead N bytes, writing them to 'result'. Returns true if successful.
+  // The result does not include the byte currently under the read cursor.
+  virtual bool PeekN(DWORD n, uint8_t* result) const = 0;
+};
+
 // The main function of the test wrapper.
 int TestWrapperMain(int argc, wchar_t** argv);
 
@@ -168,6 +205,9 @@
 bool TestOnly_CdataEscapeAndAppend(const std::wstring& abs_input,
                                    const std::wstring& abs_output);
 
+IFStream* TestOnly_CreateIFStream(bazel::windows::AutoHandle* handle,
+                                  DWORD page_size);
+
 }  // namespace testing
 
 }  // namespace test_wrapper
diff --git a/tools/test/windows/tw_test.cc b/tools/test/windows/tw_test.cc
index f9c53ce..b2263a8 100644
--- a/tools/test/windows/tw_test.cc
+++ b/tools/test/windows/tw_test.cc
@@ -35,10 +35,12 @@
 namespace {
 
 using bazel::tools::test_wrapper::FileInfo;
+using bazel::tools::test_wrapper::IFStream;
 using bazel::tools::test_wrapper::ZipEntryPaths;
 using bazel::tools::test_wrapper::testing::TestOnly_AsMixedPath;
 using bazel::tools::test_wrapper::testing::TestOnly_CdataEncodeBuffer;
 using bazel::tools::test_wrapper::testing::TestOnly_CdataEscapeAndAppend;
+using bazel::tools::test_wrapper::testing::TestOnly_CreateIFStream;
 using bazel::tools::test_wrapper::testing::TestOnly_CreateTee;
 using bazel::tools::test_wrapper::testing::
     TestOnly_CreateUndeclaredOutputsAnnotations;
@@ -601,4 +603,221 @@
             "]]>]]&gt;<![CDATA[");
 }
 
+void CreateIFStreamForData(const std::string& data,
+                           std::unique_ptr<IFStream>* result, DWORD page_size) {
+  std::wstring tmpdir;
+  GET_TEST_TMPDIR(&tmpdir);
+  std::wstring filename = tmpdir + L"\\tmp" + WLINE;
+  EXPECT_TRUE(blaze_util::CreateDummyFile(filename, data));
+
+  bazel::windows::AutoHandle read(CreateFileW(
+      filename.c_str(), GENERIC_READ, FILE_SHARE_READ | FILE_SHARE_DELETE, NULL,
+      OPEN_EXISTING, FILE_ATTRIBUTE_NORMAL, NULL));
+  EXPECT_TRUE(read.IsValid());
+
+  result->reset(TestOnly_CreateIFStream(&read, page_size));
+  EXPECT_NE(nullptr, result->get());
+}
+
+TEST_F(TestWrapperWindowsTest, TestIFStreamNoData) {
+  std::unique_ptr<IFStream> s;
+  uint8_t buf[3] = {0, 0, 0};
+
+  CreateIFStreamForData("", &s, 6);
+  ASSERT_FALSE(s->Get(buf));
+  ASSERT_FALSE(s->Advance());
+  ASSERT_FALSE(s->Peek1(buf));
+  ASSERT_FALSE(s->Peek2(buf));
+  ASSERT_FALSE(s->Peek3(buf));
+}
+
+TEST_F(TestWrapperWindowsTest, TestIFStreamLessDataThanPageSize) {
+  std::unique_ptr<IFStream> s;
+  uint8_t buf[3] = {0, 0, 0};
+
+  // The data is "abc" (3 bytes), page size is 6 bytes.
+  CreateIFStreamForData("abc", &s, 6);
+
+  // Read position is at "a".
+  ASSERT_TRUE(s->Get(buf));
+  ASSERT_EQ(buf[0], 'a');
+  ASSERT_TRUE(s->Peek1(buf));
+  ASSERT_EQ(buf[0], 'b');
+  ASSERT_TRUE(s->Peek2(buf));
+  ASSERT_EQ(buf[0], 'b');
+  ASSERT_EQ(buf[1], 'c');
+  ASSERT_FALSE(s->Peek3(buf));
+  ASSERT_TRUE(s->Advance());
+
+  // Read position is at "b".
+  ASSERT_TRUE(s->Get(buf));
+  ASSERT_EQ(buf[0], 'b');
+  ASSERT_TRUE(s->Peek1(buf));
+  ASSERT_EQ(buf[0], 'c');
+  ASSERT_FALSE(s->Peek2(buf));
+  ASSERT_FALSE(s->Peek3(buf));
+  ASSERT_TRUE(s->Advance());
+
+  // Read position is at "c".
+  ASSERT_TRUE(s->Get(buf));
+  ASSERT_EQ(buf[0], 'c');
+  ASSERT_FALSE(s->Peek1(buf));
+  ASSERT_FALSE(s->Peek2(buf));
+  ASSERT_FALSE(s->Peek3(buf));
+  ASSERT_FALSE(s->Advance());
+}
+
+TEST_F(TestWrapperWindowsTest, TestIFStreamExactlySinglePageSize) {
+  std::unique_ptr<IFStream> s;
+  uint8_t buf[3] = {0, 0, 0};
+
+  // The data is "abcdef" (6 bytes), page size is 6 bytes.
+  CreateIFStreamForData("abcdef", &s, 6);
+
+  // Read position is at "a".
+  ASSERT_TRUE(s->Get(buf));
+  ASSERT_EQ(buf[0], 'a');
+  ASSERT_TRUE(s->Peek1(buf));
+  ASSERT_EQ(buf[0], 'b');
+  ASSERT_TRUE(s->Peek2(buf));
+  ASSERT_EQ(buf[0], 'b');
+  ASSERT_EQ(buf[1], 'c');
+  ASSERT_TRUE(s->Peek3(buf));
+  ASSERT_EQ(buf[0], 'b');
+  ASSERT_EQ(buf[1], 'c');
+  ASSERT_EQ(buf[2], 'd');
+  ASSERT_TRUE(s->Advance());
+
+  // Read position is at "b". Nothing to test here, move to "c".
+  ASSERT_TRUE(s->Advance());
+
+  // Read position is at "c". Last position where we can Peek3.
+  ASSERT_TRUE(s->Get(buf));
+  ASSERT_EQ(buf[0], 'c');
+  ASSERT_TRUE(s->Peek1(buf));
+  ASSERT_EQ(buf[0], 'd');
+  ASSERT_TRUE(s->Peek2(buf));
+  ASSERT_EQ(buf[0], 'd');
+  ASSERT_EQ(buf[1], 'e');
+  ASSERT_TRUE(s->Peek3(buf));
+  ASSERT_EQ(buf[0], 'd');
+  ASSERT_EQ(buf[1], 'e');
+  ASSERT_EQ(buf[2], 'f');
+  ASSERT_TRUE(s->Advance());
+
+  // Read position is at "d". Last position where we can Peek2.
+  ASSERT_TRUE(s->Get(buf));
+  ASSERT_EQ(buf[0], 'd');
+  ASSERT_TRUE(s->Peek1(buf));
+  ASSERT_EQ(buf[0], 'e');
+  ASSERT_TRUE(s->Peek2(buf));
+  ASSERT_EQ(buf[0], 'e');
+  ASSERT_EQ(buf[1], 'f');
+  ASSERT_FALSE(s->Peek3(buf));
+  ASSERT_TRUE(s->Advance());
+
+  // Read position is at "e". Last position where we can Peek1.
+  ASSERT_TRUE(s->Get(buf));
+  ASSERT_EQ(buf[0], 'e');
+  ASSERT_TRUE(s->Peek1(buf));
+  ASSERT_EQ(buf[0], 'f');
+  ASSERT_FALSE(s->Peek2(buf));
+  ASSERT_FALSE(s->Peek3(buf));
+  ASSERT_TRUE(s->Advance());
+
+  // Read position is at "f". No more peeking or moving.
+  ASSERT_TRUE(s->Get(buf));
+  ASSERT_EQ(buf[0], 'f');
+  ASSERT_FALSE(s->Peek1(buf));
+  ASSERT_FALSE(s->Peek2(buf));
+  ASSERT_FALSE(s->Peek3(buf));
+  ASSERT_FALSE(s->Advance());
+}
+
+TEST_F(TestWrapperWindowsTest, TestIFStreamLessDataThanDoublePageSize) {
+  std::unique_ptr<IFStream> s;
+  uint8_t buf[3] = {0, 0, 0};
+
+  CreateIFStreamForData("abcdefghi", &s, 6);
+
+  // Move near the page boundary.
+  while (buf[0] != 'e') {
+    ASSERT_TRUE(s->Advance());
+    ASSERT_TRUE(s->Get(buf));
+  }
+
+  // Read position is at "e". Peek2 and Peek3 will need to read from next page.
+  ASSERT_TRUE(s->Get(buf));
+  ASSERT_EQ(buf[0], 'e');
+  ASSERT_TRUE(s->Peek1(buf));
+  ASSERT_EQ(buf[0], 'f');
+  ASSERT_TRUE(s->Peek2(buf));
+  ASSERT_EQ(buf[0], 'f');
+  ASSERT_EQ(buf[1], 'g');
+  ASSERT_TRUE(s->Peek3(buf));
+  ASSERT_EQ(buf[0], 'f');
+  ASSERT_EQ(buf[1], 'g');
+  ASSERT_EQ(buf[2], 'h');
+  ASSERT_TRUE(s->Advance());
+
+  // Read position is at "f". Keep moving.
+  ASSERT_TRUE(s->Advance());
+  // Read position is at "g". Keep moving.
+  ASSERT_TRUE(s->Advance());
+
+  // Read position is at "h".
+  ASSERT_TRUE(s->Get(buf));
+  ASSERT_EQ(buf[0], 'h');
+  ASSERT_TRUE(s->Peek1(buf));
+  ASSERT_EQ(buf[0], 'i');
+  ASSERT_FALSE(s->Peek2(buf));
+  ASSERT_FALSE(s->Peek3(buf));
+  ASSERT_TRUE(s->Advance());
+}
+
+TEST_F(TestWrapperWindowsTest, TestIFStreamLessDataThanTriplePageSize) {
+  std::unique_ptr<IFStream> s;
+  uint8_t buf[3] = {0, 0, 0};
+
+  // Data is 15 bytes, page size is 6 bytes, we'll cross 2 page boundaries.
+  CreateIFStreamForData("abcdefghijklmno", &s, 6);
+
+  // Move near the second page boundary.
+  while (buf[0] != 'k') {
+    ASSERT_TRUE(s->Advance());
+    ASSERT_TRUE(s->Get(buf));
+  }
+
+  // Read position is at "k". Peek2 and Peek3 will need to read from last page.
+  ASSERT_TRUE(s->Get(buf));
+  ASSERT_EQ(buf[0], 'k');
+  ASSERT_TRUE(s->Peek1(buf));
+  ASSERT_EQ(buf[0], 'l');
+  ASSERT_TRUE(s->Peek2(buf));
+  ASSERT_EQ(buf[0], 'l');
+  ASSERT_EQ(buf[1], 'm');
+  ASSERT_TRUE(s->Peek3(buf));
+  ASSERT_EQ(buf[0], 'l');
+  ASSERT_EQ(buf[1], 'm');
+  ASSERT_EQ(buf[2], 'n');
+  ASSERT_TRUE(s->Advance());
+
+  // Move near the end of the last page.
+  while (buf[0] != 'm') {
+    ASSERT_TRUE(s->Advance());
+    ASSERT_TRUE(s->Get(buf));
+  }
+
+  // Read position is at "h".
+  ASSERT_TRUE(s->Get(buf));
+  ASSERT_EQ(buf[0], 'm');
+  ASSERT_TRUE(s->Peek1(buf));
+  ASSERT_EQ(buf[0], 'n');
+  ASSERT_TRUE(s->Peek2(buf));
+  ASSERT_EQ(buf[0], 'n');
+  ASSERT_EQ(buf[1], 'o');
+  ASSERT_FALSE(s->Peek3(buf));
+  ASSERT_TRUE(s->Advance());
+}
+
 }  // namespace