Skip to content

Commit

Permalink
Merge pull request microsoft#7228 from ubsan/parallel-file-ops
Browse files Browse the repository at this point in the history
Parallel file operations
  • Loading branch information
strega-nil authored Jul 24, 2019
2 parents 265921b + 2c20a9d commit 36dea3d
Show file tree
Hide file tree
Showing 34 changed files with 951 additions and 107 deletions.
9 changes: 7 additions & 2 deletions toolsrc/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ if(GCC OR CLANG)
endif()

file(GLOB_RECURSE VCPKGLIB_SOURCES src/vcpkg/*.cpp)
file(GLOB_RECURSE VCPKGTEST_SOURCES src/vcpkg-tests/*.cpp)
file(GLOB_RECURSE VCPKGTEST_SOURCES src/vcpkg-test/*.cpp)

if (DEFINE_DISABLE_METRICS)
set(DISABLE_METRICS_VALUE "1")
Expand All @@ -52,7 +52,11 @@ add_executable(vcpkg-test EXCLUDE_FROM_ALL ${VCPKGTEST_SOURCES} ${VCPKGLIB_SOURC
target_compile_definitions(vcpkg-test PRIVATE -DDISABLE_METRICS=${DISABLE_METRICS_VALUE})
target_include_directories(vcpkg-test PRIVATE include)

foreach(TEST_NAME arguments chrono dependencies paragraph plan specifier supports)
foreach(TEST_NAME
arguments chrono dependencies files
paragraph plan specifier statusparagraphs
strings supports update
)
add_test(${TEST_NAME} vcpkg-test [${TEST_NAME}])
endforeach()

Expand Down Expand Up @@ -91,3 +95,4 @@ endif()
set(THREADS_PREFER_PTHREAD_FLAG ON)
find_package(Threads REQUIRED)
target_link_libraries(vcpkg PRIVATE Threads::Threads)
target_link_libraries(vcpkg-test PRIVATE Threads::Threads)
File renamed without changes.
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
#include <vcpkg/base/files.h>
#include <vcpkg/statusparagraph.h>

#include <memory>
Expand Down Expand Up @@ -30,4 +31,12 @@ T&& unwrap(vcpkg::Optional<T>&& opt)
return std::move(*opt.get());
}

extern const bool SYMLINKS_ALLOWED;

extern const fs::path TEMPORARY_DIRECTORY;

void create_symlink(const fs::path& file, const fs::path& target, std::error_code& ec);

void create_directory_symlink(const fs::path& file, const fs::path& target, std::error_code& ec);

}
57 changes: 53 additions & 4 deletions toolsrc/include/vcpkg/base/files.h
Original file line number Diff line number Diff line change
Expand Up @@ -13,13 +13,60 @@ namespace fs
using stdfs::file_status;
using stdfs::file_type;
using stdfs::path;
using stdfs::perms;
using stdfs::u8path;

inline bool is_regular_file(file_status s) { return stdfs::is_regular_file(s); }
inline bool is_directory(file_status s) { return stdfs::is_directory(s); }
inline bool is_symlink(file_status s) { return stdfs::is_symlink(s); }
/*
std::experimental::filesystem's file_status and file_type are broken in
the presence of symlinks -- a symlink is treated as the object it points
to for `symlink_status` and `symlink_type`
*/

using stdfs::status;

// we want to poison ADL with these niebloids

namespace detail
{
struct symlink_status_t
{
file_status operator()(const path& p, std::error_code& ec) const noexcept;
file_status operator()(const path& p, vcpkg::LineInfo li) const noexcept;
};
struct is_symlink_t
{
inline bool operator()(file_status s) const { return stdfs::is_symlink(s); }
};
struct is_regular_file_t
{
inline bool operator()(file_status s) const { return stdfs::is_regular_file(s); }
};
struct is_directory_t
{
inline bool operator()(file_status s) const { return stdfs::is_directory(s); }
};
}

constexpr detail::symlink_status_t symlink_status{};
constexpr detail::is_symlink_t is_symlink{};
constexpr detail::is_regular_file_t is_regular_file{};
constexpr detail::is_directory_t is_directory{};
}

/*
if someone attempts to use unqualified `symlink_status` or `is_symlink`,
they might get the ADL version, which is broken.
Therefore, put `symlink_status` in the global namespace, so that they get
our symlink_status.
We also want to poison the ADL on is_regular_file and is_directory, because
we don't want people calling these functions on paths
*/
using fs::is_directory;
using fs::is_regular_file;
using fs::is_symlink;
using fs::symlink_status;

namespace vcpkg::Files
{
struct Filesystem
Expand All @@ -44,7 +91,9 @@ namespace vcpkg::Files
std::error_code& ec) = 0;
bool remove(const fs::path& path, LineInfo linfo);
virtual bool remove(const fs::path& path, std::error_code& ec) = 0;
virtual std::uintmax_t remove_all(const fs::path& path, std::error_code& ec) = 0;

virtual std::uintmax_t remove_all(const fs::path& path, std::error_code& ec, fs::path& failure_point) = 0;
std::uintmax_t remove_all(const fs::path& path, LineInfo li);
virtual bool exists(const fs::path& path) const = 0;
virtual bool is_directory(const fs::path& path) const = 0;
virtual bool is_regular_file(const fs::path& path) const = 0;
Expand Down
5 changes: 5 additions & 0 deletions toolsrc/include/vcpkg/base/strings.h
Original file line number Diff line number Diff line change
Expand Up @@ -184,4 +184,9 @@ namespace vcpkg::Strings
const char* search(StringView haystack, StringView needle);

bool contains(StringView haystack, StringView needle);

// base 32 encoding, since base64 encoding requires lowercase letters,
// which are not distinct from uppercase letters on macOS or Windows filesystems.
// follows RFC 4648
std::string b32_encode(std::uint64_t x) noexcept;
}
230 changes: 230 additions & 0 deletions toolsrc/include/vcpkg/base/work_queue.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,230 @@
#pragma once

#include <condition_variable>
#include <memory>
#include <queue>

namespace vcpkg
{
template<class Action, class ThreadLocalData>
struct WorkQueue;

namespace detail
{
// for SFINAE purposes, keep out of the class
template<class Action, class ThreadLocalData>
auto call_moved_action(Action& action,
const WorkQueue<Action, ThreadLocalData>& work_queue,
ThreadLocalData& tld) -> decltype(static_cast<void>(std::move(action)(tld, work_queue)))
{
std::move(action)(tld, work_queue);
}

template<class Action, class ThreadLocalData>
auto call_moved_action(Action& action, const WorkQueue<Action, ThreadLocalData>&, ThreadLocalData& tld)
-> decltype(static_cast<void>(std::move(action)(tld)))
{
std::move(action)(tld);
}
}

template<class Action, class ThreadLocalData>
struct WorkQueue
{
template<class F>
WorkQueue(std::uint16_t num_threads, LineInfo li, const F& tld_init) noexcept
{
m_line_info = li;

set_unjoined_workers(num_threads);
m_threads.reserve(num_threads);
for (std::size_t i = 0; i < num_threads; ++i)
{
m_threads.push_back(std::thread(Worker{this, tld_init()}));
}
}

WorkQueue(WorkQueue const&) = delete;
WorkQueue(WorkQueue&&) = delete;

~WorkQueue()
{
auto lck = std::unique_lock<std::mutex>(m_mutex);
if (!is_joined(m_state))
{
Checks::exit_with_message(m_line_info, "Failed to call join() on a WorkQueue that was destroyed");
}
}

// should only be called once; anything else is an error
void run(LineInfo li)
{
// this should _not_ be locked before `run()` is called; however, we
// want to terminate if someone screws up, rather than cause UB
auto lck = std::unique_lock<std::mutex>(m_mutex);

if (m_state != State::BeforeRun)
{
Checks::exit_with_message(li, "Attempted to run() twice");
}

m_state = State::Running;
}

// runs all remaining tasks, and blocks on their finishing
// if this is called in an existing task, _will block forever_
// DO NOT DO THAT
// thread-unsafe
void join(LineInfo li)
{
{
auto lck = std::unique_lock<std::mutex>(m_mutex);
if (is_joined(m_state))
{
Checks::exit_with_message(li, "Attempted to call join() more than once");
}
else if (m_state == State::Terminated)
{
m_state = State::TerminatedJoined;
}
else
{
m_state = State::Joined;
}
}

while (unjoined_workers())
{
if (!running_workers())
{
m_cv.notify_one();
}
}

// wait for all threads to join
for (auto& thrd : m_threads)
{
thrd.join();
}
}

// useful in the case of errors
// doesn't stop any existing running tasks
// returns immediately, so that one can call this in a task
void terminate() const
{
{
auto lck = std::unique_lock<std::mutex>(m_mutex);
if (is_joined(m_state))
{
m_state = State::TerminatedJoined;
}
else
{
m_state = State::Terminated;
}
}
m_cv.notify_all();
}

void enqueue_action(Action a) const
{
{
auto lck = std::unique_lock<std::mutex>(m_mutex);
m_actions.push_back(std::move(a));

if (m_state == State::BeforeRun) return;
}
m_cv.notify_one();
}

private:
struct Worker
{
const WorkQueue* work_queue;
ThreadLocalData tld;

void operator()()
{
// unlocked when waiting, or when in the action
// locked otherwise
auto lck = std::unique_lock<std::mutex>(work_queue->m_mutex);

work_queue->m_cv.wait(lck, [&] { return work_queue->m_state != State::BeforeRun; });

work_queue->increment_running_workers();
for (;;)
{
const auto state = work_queue->m_state;

if (is_terminated(state))
{
break;
}

if (work_queue->m_actions.empty())
{
if (state == State::Running || work_queue->running_workers() > 1)
{
work_queue->decrement_running_workers();
work_queue->m_cv.wait(lck);
work_queue->increment_running_workers();
continue;
}

// the queue is joining, and we are the only worker running
// no more work!
break;
}

Action action = std::move(work_queue->m_actions.back());
work_queue->m_actions.pop_back();

lck.unlock();
work_queue->m_cv.notify_one();
detail::call_moved_action(action, *work_queue, tld);
lck.lock();
}

work_queue->decrement_running_workers();
work_queue->decrement_unjoined_workers();
}
};

enum class State : std::int16_t
{
// can only exist upon construction
BeforeRun = -1,

Running,
Joined,
Terminated,
TerminatedJoined,
};

static bool is_terminated(State st) { return st == State::Terminated || st == State::TerminatedJoined; }

static bool is_joined(State st) { return st == State::Joined || st == State::TerminatedJoined; }

mutable std::mutex m_mutex{};
// these are all under m_mutex
mutable State m_state = State::BeforeRun;
mutable std::vector<Action> m_actions{};
mutable std::condition_variable m_cv{};

mutable std::atomic<std::uint32_t> m_workers;
// = unjoined_workers << 16 | running_workers

void set_unjoined_workers(std::uint16_t threads) { m_workers = std::uint32_t(threads) << 16; }
void decrement_unjoined_workers() const { m_workers -= 1 << 16; }

std::uint16_t unjoined_workers() const { return std::uint16_t(m_workers >> 16); }

void increment_running_workers() const { ++m_workers; }
void decrement_running_workers() const { --m_workers; }
std::uint16_t running_workers() const { return std::uint16_t(m_workers); }

std::vector<std::thread> m_threads{};
LineInfo m_line_info;
};
}
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
#include <vcpkg-tests/catch.h>
#include <vcpkg-test/catch.h>

#include <vcpkg/vcpkgcmdarguments.h>

Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
#define CATCH_CONFIG_RUNNER
#include <vcpkg-tests/catch.h>
#include <vcpkg-test/catch.h>

#include <vcpkg/base/system.debug.h>

Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
#include <vcpkg-tests/catch.h>
#include <vcpkg-test/catch.h>

#include <vcpkg/base/chrono.h>

Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
#include <vcpkg-tests/catch.h>
#include <vcpkg-test/catch.h>

#include <vcpkg/sourceparagraph.h>

Expand Down
Loading

0 comments on commit 36dea3d

Please sign in to comment.