diff --git a/src/fdb5/tools/fdb-hammer.cc b/src/fdb5/tools/fdb-hammer.cc index 4187ea704..402e71a69 100644 --- a/src/fdb5/tools/fdb-hammer.cc +++ b/src/fdb5/tools/fdb-hammer.cc @@ -17,9 +17,11 @@ #include "eckit/config/LocalConfiguration.h" #include "eckit/config/Resource.h" +#include "eckit/filesystem/LocalPathName.h" #include "eckit/io/DataHandle.h" #include "eckit/io/EmptyHandle.h" #include "eckit/io/FileDescHandle.h" +#include "eckit/io/FileLock.h" #include "eckit/io/MemoryHandle.h" #include "eckit/io/StdFile.h" #include "eckit/log/TimeStamp.h" @@ -58,16 +60,16 @@ namespace fdb5::tools { namespace { -// Valid type=pf,levtype=pl parameters. This allows for up to 174 parameters. -// Note that not all paramids are valid for encoding GRIB data with all combinations of type, levtype. -// This provides a set which can be used for providing a range corresponding to --nparams. - enum class CheckType : long { NONE = 0, MD_CHECK, FULL_CHECK }; +// Valid type=pf,levtype=pl parameters. This allows for up to 174 parameters. +// Note that not all paramids are valid for encoding GRIB data with all combinations of type, levtype. +// This provides a set which can be used for providing a range corresponding to --nparams. + const std::vector VALID_PARAMS{ 1, 2, 3, 4, 5, 6, 7, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 32, 33, 35, 36, 37, 38, 39, 40, 41, 42, 43, 46, 53, 54, 60, 62, 63, @@ -111,6 +113,332 @@ Key shortFDBKey(const Q& step, const R& member, const S& level, const T& param) //---------------------------------------------------------------------------------------------------------------------- +/// @note: implements a multi-process multi-node barrier with FIFOs and TCP sockets. + +/// @note: this barrier implementation has been found to be slow in the intra-node +/// synchronisation phase if the node is very busy performing an intensive +/// POSIX I/O workload. + +class Barrier { + +public: + + Barrier(const std::vector& nodes, size_t ppn, int port, int max_wait, bool report); + + void finalise(); + void barrier(); + +private: + + void init(); + void electLeaderProcess(); + void barrierInternode(); + void barrierIntranode(); + +private: + + const std::vector& nodes_; + size_t ppn_; + int port_; + int maxWait_; + + bool report_; + std::string hostname_; + + eckit::PathName waitFifo_; + eckit::PathName barrierFifo_; + eckit::PathName pidFile_; + eckit::PathName lockFile_; + std::vector serverConnections_; + eckit::net::TCPSocket clientConnection_; + std::unique_ptr timer_; + + bool isLeaderProcess_ = false; + bool isLeaderNode_ = false; + bool init_ = false; +}; + +Barrier::Barrier(const std::vector& nodes, size_t ppn, int port, int max_wait, bool report) : + nodes_(nodes), + ppn_(ppn), + port_(port), + maxWait_(max_wait), + report_(report), + hostname_(Main::hostname()), + serverConnections_(nodes_.size() ? (nodes_.size() - 1) : 0) { + + ASSERT(nodes_.size() > 0); + + if (nodes_.size() == 1) + ASSERT(hostname_ == nodes_[0]); + + uid_t uid = ::getuid(); + eckit::Translator uid_to_str; + eckit::PathName default_run_path{"/var/run/user"}; + default_run_path /= uid_to_str(uid); + + eckit::PathName run_path(eckit::Resource("$FDB_BARRIER_RUN_PATH", default_run_path)); + + waitFifo_ = run_path / "barrier.wait.fifo"; + + barrierFifo_ = run_path / "barrier.release.fifo"; + + lockFile_ = run_path / "barrier.lock"; + pidFile_ = run_path / "barrier.pid"; + + timer_ = std::make_unique(); +} + +void Barrier::electLeaderProcess() { + + /// create an application-unique lock file if not exists, and lock it + eckit::FileLock flock{lockFile_}; + flock.lock(); + + /// create an application-unique PID file if not exists + pidFile_.touch(); + + /// the leader PID is read from the file + std::unique_ptr fh(pidFile_.fileHandle()); + eckit::HandleStream hs{*fh}; + long pid = 0; + eckit::Length size = fh->openForRead(); + { + eckit::AutoClose closer(*fh); + if (size > eckit::Length(0)) + hs >> pid; + } + pid_t leader_pid = (long)pid; + + /// if no PID is found or the PID does not exist, this process becomes the leader + if (size == eckit::Length(0) || ::kill(leader_pid, 0) != 0) { + + isLeaderProcess_ = true; + + /// a pair of FIFOs are created. One for clients to communicate the leader they are + /// waiting, and another to open in blocking mode until leader opens it for write + /// once it has synchronised with peer nodes + if (waitFifo_.exists()) + waitFifo_.unlink(); + SYSCALL(::mkfifo(waitFifo_.localPath(), 0666)); + + if (barrierFifo_.exists()) + barrierFifo_.unlink(); + SYSCALL(::mkfifo(barrierFifo_.localPath(), 0666)); + + /// the leader PID is written into the file + LocalPathName{pidFile_}.truncate(eckit::Length(0)); + ASSERT(fh->isEmpty()); + eckit::HandleStream hs_write{*fh}; + fh->openForWrite(eckit::Length(sizeof(long))); + { + eckit::AutoClose closer(*fh); + hs_write << (long)::getpid(); + } + } + + /// unlock the PID file + flock.unlock(); +} + +void Barrier::init() { + + if (init_) + return; + + electLeaderProcess(); + + // if this process is the intranode leader process and there are multiple nodes, + // establish TCP connections between all intranode leader processes. + + if (!isLeaderProcess_ || nodes_.size() == 1) { + init_ = true; + return; + } + + if (hostname_ == nodes_[0]) { + + // this is the leader node + isLeaderNode_ = true; + + // accept as many connections as peer nodes, on the designated port. + + eckit::net::TCPServer server(port_); + + for (int i = 0; i < serverConnections_.size(); ++i) { + serverConnections_[i] = server.accept("Waiting for connection", maxWait_); + } + + // the TCPServer is auto-closed + } + else { + + /// @note if the server is not yet listening, the client connection wil fail and will be retried every second + /// until timeout. This could be adjusted to retry more frequently. + + eckit::net::TCPClient client; + clientConnection_ = client.connect(nodes_[0], port_, maxWait_, 0, 1); + } + + init_ = true; +} + +void Barrier::finalise() { + + if (init_ && isLeaderProcess_) { + waitFifo_.unlink(false); + barrierFifo_.unlink(false); + pidFile_.unlink(false); + lockFile_.unlink(false); + } + + if (report_) + timer_->report("Barrier time"); + timer_ = std::make_unique(); +} + +void Barrier::barrier() { + + timer_->start(); + + if (!init_) + init(); + + // If there is only one process per node, we don't need the intra-node barrier + if (ppn_ == 1) + return barrierInternode(); + + barrierIntranode(); + + timer_->stop(); +} + +/// implementation of internode barrier using TCP sockets. +void Barrier::barrierInternode() { + + if (hostname_ == nodes_[0]) { + + // this is the leader node + + // send barrier end signal to all clients. + + const char message[] = {'E', 'N', 'D'}; + for (auto& socket : serverConnections_) { + socket.write(&message[0], sizeof(message)); + socket.closeOutput(); /// ensure all data is sent + } + } + else { + + // This is NOT the leader node + + // Wait for barrier end + + std::array message; + long read = clientConnection_.read(message.data(), message.size()); + ASSERT(::memcmp("END", message.data(), message.size()) == 0); + } +} + +void Barrier::barrierIntranode() { + + if (isLeaderProcess_) { + + /// a signal from each peer process in the node is received + std::array message = {'S', 'I', 'G'}; + std::unique_ptr fh_wait(waitFifo_.fileHandle()); + /// TODO: handle errors and no-replies on open as well as on read and unlink + fh_wait->openForRead(); + { + eckit::AutoClose closer(*fh_wait); + size_t pending = ppn_ - 1; + while (pending > 0) { + message = {'0', '0', '0'}; + ASSERT(fh_wait->read(&message[0], message.size()) == message.size()); + ASSERT(std::string(message.begin(), message.end()) == "SIG"); + --pending; + } + } + + /// once all processes in the node are ready, barrier with peer nodes + std::exception_ptr eptr; + try { + eckit::Timer barrier_timer; + barrier_timer.start(); + barrierInternode(); + barrier_timer.stop(); + // barrier_timer.reset("Inter-node barrier"); + } + catch (...) { + eptr = std::current_exception(); + } + + /// once all nodes are ready, open the barrier fifo for write which will + /// release all waiting peer processes in the node + std::unique_ptr fh_barrier(barrierFifo_.fileHandle()); + /// TODO: handle errors and no-replies on open as well as unlink + fh_barrier->openForWrite(eckit::Length(0)); + { + eckit::AutoClose closer(*fh_barrier); + /// if the inter-node barrier failed, notify the clients via the barrier fifo + if (eptr) { + message = {'S', 'I', 'G'}; + size_t pending = ppn_ - 1; + while (pending > 0) { + ASSERT(fh_barrier->write(&message[0], message.size()) == message.size()); + --pending; + } + } + } + + /// throw if the inter-node barrier failed + if (eptr) + std::rethrow_exception(eptr); + } + else { + + /// wait for leader to open barrier FIFO to signal barrier end + auto future = std::async(std::launch::async, [this]() { + try { + /// TODO: handle errors and no-replies on opens as well as on write + /// NOTE: cannot use FileHandle.openForRead as it internally performs + /// an estimate() on the fifo which may have been removed by leader. + int fd; + SYSCALL(fd = ::open(barrierFifo_.localPath(), O_RDONLY)); + eckit::FileDescHandle fh_barrier(fd, true); + + /// check for any errors reported by leader + std::array message = {'0', '0', '0'}; + long bytes = fh_barrier.read(&message[0], message.size()); + if (bytes > 0) { + ASSERT(bytes == message.size()); + ASSERT(std::string(message.begin(), message.end()) == "SIG"); + return false; + } + + /// fd is autoclosed + return true; + } + catch (std::exception& e) { + return false; + } + }); + + /// signal the leader this process is waiting + std::array message = {'S', 'I', 'G'}; + std::unique_ptr fh_wait(waitFifo_.fileHandle()); + // eckit::HandleStream hs_wait{*fh_wait}; + fh_wait->openForWrite(eckit::Length(sizeof(long))); + eckit::AutoClose closer(*fh_wait); + ASSERT(fh_wait->write(&message[0], message.size()) == message.size()); + + if (!future.get()) + throw eckit::Exception("Error receiving response from barrier leader process."); + } +} + +//---------------------------------------------------------------------------------------------------------------------- + struct HammerConfig { // Types @@ -833,258 +1161,6 @@ void FDBHammer::init(const eckit::option::CmdArgs& args) { namespace { -// TODO(ssmart): Create a barrier OBJECT which can maintain some of the state... - -/// Local implementation of internode barriers using TCP sockets. -void barrier_internode(const std::vector& nodes, int port, int max_wait) { - - std::string hostname = Main::hostname(); - if (nodes.size() == 1) { - ASSERT(hostname == nodes[0]); - return; - } - - if (hostname == nodes[0]) { - - // this is the leader node - // accept as many connections as peer nodes, on the designated port. - - eckit::net::TCPServer server(port); - std::vector connections(nodes.size() - 1); - - for (int i = 0; i < connections.size(); ++i) { - connections[i] = server.accept("Waiting for connection", max_wait); - } - - // once all nodes are ready, send barrier end signal to all clients. - - const char message[] = {'E', 'N', 'D'}; - for (auto& socket : connections) { - socket.write(&message[0], sizeof(message)); - socket.closeOutput(); /// ensure all data is sent - } - - // the TCPSockets & TCPServer are auto-closed - } - else { - - // This is NOT the leader node - - /// @note if the server is not yet listening, the client connection wil fail and will be retriede evry second - /// until timeout. - /// This could be adjusted to either retry more frequently, or to maintain persistent connections to the - /// master node to avoid the overhead of establishing the connection every time. - - eckit::net::TCPClient client; - eckit::net::TCPSocket socket = client.connect(nodes[0], port, max_wait, 0, 1); - - // Wait for barrier end - - std::array message; - long read = socket.read(message.data(), message.size()); - ASSERT(::memcmp("END", message.data(), message.size()) == 0); - - // TCPSocket is auto-closed - } -} - -void barrier(size_t ppn, const std::vector& nodes, int port, int max_wait) { - - // If there is only one process per node, we don't need the intra-node barrier - - if (ppn == 1) { - barrier_internode(nodes, port, max_wait); - return; - } - - bool leader_found = false; - while (!leader_found) { - - uid_t uid = ::getuid(); - eckit::Translator uid_to_str; - eckit::PathName default_run_path{"/var/run/user"}; - default_run_path /= uid_to_str(uid); - - eckit::PathName run_path(eckit::Resource("$FDB_HAMMER_RUN_PATH", default_run_path)); - - eckit::PathName wait_fifo = run_path; - wait_fifo /= "fdb-hammer.wait.fifo"; - - eckit::PathName barrier_fifo = run_path; - barrier_fifo /= "fdb-hammer.barrier.fifo"; - - /// attempt exclusive create of an application-unique file - eckit::PathName pid_file = run_path; - pid_file /= "fdb-hammer.pid"; - - int fd; - fd = ::open(pid_file.localPath(), O_EXCL | O_CREAT | O_WRONLY, 0666); - if (fd < 0 && errno != EEXIST) - throw eckit::FailedSystemCall("open", Here(), errno); - - if (fd >= 0) { - - /// if succeeded, this process is the leader - - /// a pair of FIFOs are created. One for clients to communicate the leader they are - /// waiting, and another to open in blocking mode until leader opens it for write - /// once it has synchronised with peer nodes - if (wait_fifo.exists()) - wait_fifo.unlink(); - SYSCALL(::mkfifo(wait_fifo.localPath(), 0666)); - - if (barrier_fifo.exists()) - barrier_fifo.unlink(); - SYSCALL(::mkfifo(barrier_fifo.localPath(), 0666)); - - /// the leader PID is written into the file - SYSCALL(::close(fd)); - std::unique_ptr fh(pid_file.fileHandle()); - eckit::HandleStream hs{*fh}; - fh->openForWrite(eckit::Length(sizeof(long))); - { - eckit::AutoClose closer(*fh); - hs << (long)::getpid(); - } - - /// a signal from each peer process in the node is received - std::vector message = {'S', 'I', 'G'}; - std::unique_ptr fh_wait(wait_fifo.fileHandle()); - /// TODO: handle errors and no-replies on open as well as on read and unlink - fh_wait->openForRead(); - { - eckit::AutoClose closer(*fh_wait); - size_t pending = ppn - 1; - while (pending > 0) { - message = {'0', '0', '0'}; - ASSERT(fh_wait->read(&message[0], message.size()) == message.size()); - ASSERT(std::string(message.begin(), message.end()) == "SIG"); - --pending; - } - } - /// remove the wait fifo - wait_fifo.unlink(false); - - /// once all processes in the node are ready, barrier with peer nodes - std::exception_ptr eptr; - try { - eckit::Timer barrier_timer; - barrier_timer.start(); - barrier_internode(nodes, port, max_wait); - barrier_timer.stop(); - // barrier_timer.reset("Inter-node barrier"); - } - catch (...) { - eptr = std::current_exception(); - } - - /// once all nodes are ready, open the barrier fifo for write which will - /// release all waiting peer processes in the node - std::unique_ptr fh_barrier(barrier_fifo.fileHandle()); - /// TODO: handle errors and no-replies on open as well as unlink - fh_barrier->openForWrite(eckit::Length(0)); - { - eckit::AutoClose closer(*fh_barrier); - /// if the inter-node barrier failed, notify the clients via the barrier fifo - if (eptr) { - message = {'S', 'I', 'G'}; - size_t pending = ppn - 1; - while (pending > 0) { - ASSERT(fh_barrier->write(&message[0], message.size()) == message.size()); - --pending; - } - } - } - - /// remove the barrier fifo - barrier_fifo.unlink(false); - - pid_file.unlink(false); - - /// throw if the inter-node barrier failed - if (eptr) - std::rethrow_exception(eptr); - } - else { - - /// otherwise, the process is a client - - /// the leader PID is read from the file - std::unique_ptr fh(pid_file.fileHandle()); - eckit::HandleStream hs{*fh}; - long pid; - eckit::Length size = fh->openForRead(); - { - eckit::AutoClose closer(*fh); - /// the PID file may be empty if trying too soon - if (size == eckit::Length(0)) { - fh->close(); - ::sleep(1); - size = fh->openForRead(); - } - if (size == eckit::Length(0)) { - throw eckit::SeriousBug("Found empty PID file. Leader too slow or manual remove required."); - } - hs >> pid; - } - pid_t leader_pid = (long)pid; - - /// the leadr PID is checked to exist. If not, clean up and - /// restart election procedure - if (::kill(leader_pid, 0) != 0) { - try { - pid_file.unlink(); - } - catch (eckit::FailedSystemCall& e) { - if (errno != ENOENT) - throw; - } - continue; - } - - /// wait for leader to open barrier FIFO to signal barrier end - auto future = std::async(std::launch::async, [barrier_fifo]() { - try { - /// TODO: handle errors and no-replies on opens as well as on write - /// NOTE: cannot use FileHandle.openForRead as it internally performs - /// an estimate() on the fifo which may have been removed by leader. - int fd; - SYSCALL(fd = ::open(barrier_fifo.localPath(), O_RDONLY)); - eckit::FileDescHandle fh_barrier(fd, true); - - /// check for any errors reported by leader - std::vector message = {'0', '0', '0'}; - long bytes = fh_barrier.read(&message[0], message.size()); - if (bytes > 0) { - ASSERT(bytes == message.size()); - ASSERT(std::string(message.begin(), message.end()) == "SIG"); - return false; - } - - /// fd is autoclosed - return true; - } - catch (std::exception& e) { - return false; - } - }); - - /// signal the leader this process is waiting - std::vector message = {'S', 'I', 'G'}; - std::unique_ptr fh_wait(wait_fifo.fileHandle()); - // eckit::HandleStream hs_wait{*fh_wait}; - fh_wait->openForWrite(eckit::Length(sizeof(long))); - eckit::AutoClose closer(*fh_wait); - ASSERT(fh_wait->write(&message[0], message.size()) == message.size()); - - if (!future.get()) - throw eckit::Exception("Error receiving response from barrier leader process."); - } - - leader_found = true; - } -} - uint32_t xorshift(uint32_t& state) { state ^= (state << 13); state ^= (state >> 17); @@ -1186,13 +1262,10 @@ void FDBHammer::executeWrite() { } if (config_.execution.itt) { - eckit::Timer barrier_timer; - barrier_timer.start(); - // Barrier.block(); - barrier(config_.parallel.ppn, config_.parallel.nodelist, config_.parallel.barrierPort, - config_.parallel.barrierMaxWait); - barrier_timer.stop(); - // barrier_timer.reset("Barrier pre-step 0"); + Barrier barrier{config_.parallel.nodelist, (size_t)config_.parallel.ppn, config_.parallel.barrierPort, + config_.parallel.barrierMaxWait, false}; + barrier.barrier(); + barrier.finalise(); float delay_range = config_.iteration.stepWindow * (config_.iteration.randomDelay / 100); if (delay_range > 0) {