diff --git a/src/fdb5/daos/DaosCatalogueWriter.h b/src/fdb5/daos/DaosCatalogueWriter.h index 0ea9f39bc..8e64b6310 100644 --- a/src/fdb5/daos/DaosCatalogueWriter.h +++ b/src/fdb5/daos/DaosCatalogueWriter.h @@ -37,7 +37,7 @@ class DaosCatalogueWriter : public DaosCatalogue, public CatalogueWriter { /// Mount an existing TocCatalogue, which has a different metadata key (within /// constraints) to allow on-line rebadging of data /// variableKeys: The keys that are allowed to differ between the two DBs - void overlayDB(const Catalogue& otherCatalogue, const std::set& variableKeys, bool unmount) override { NOTIMP; }; + void overlayDB(const Catalogue* srcCatalogue, const eckit::StringSet& variableKeys, bool unmount) override { NOTIMP; }; // // Hide the contents of the DB!!! // void hideContents() override; diff --git a/src/fdb5/database/Catalogue.h b/src/fdb5/database/Catalogue.h index 9c798035e..3d862448e 100644 --- a/src/fdb5/database/Catalogue.h +++ b/src/fdb5/database/Catalogue.h @@ -137,7 +137,7 @@ class CatalogueReader : virtual public Catalogue { public: CatalogueReader() {} - + virtual ~CatalogueReader() {} virtual DbStats stats() const = 0; @@ -156,7 +156,7 @@ class CatalogueWriter : virtual public Catalogue { virtual const Index& currentIndex() = 0; virtual const Key currentIndexKey(); virtual void archive(const Key& idxKey, const Key& datumKey, std::shared_ptr fieldLocation) = 0; - virtual void overlayDB(const Catalogue& otherCatalogue, const std::set& variableKeys, bool unmount) = 0; + virtual void overlayDB(const Catalogue* srcCatalogue, const eckit::StringSet& variableKeys, bool unmount) = 0; virtual void index(const Key& key, const eckit::URI& uri, eckit::Offset offset, eckit::Length length) = 0; virtual void reconsolidate() = 0; }; diff --git a/src/fdb5/remote/Connection.cc b/src/fdb5/remote/Connection.cc index 796b423e7..4f0492282 100644 --- a/src/fdb5/remote/Connection.cc +++ b/src/fdb5/remote/Connection.cc @@ -12,8 +12,6 @@ namespace fdb5::remote { //---------------------------------------------------------------------------------------------------------------------- -Connection::Connection() : single_(false) { } - void Connection::teardown() { if (!single_) { diff --git a/src/fdb5/remote/Connection.h b/src/fdb5/remote/Connection.h index 5dad425c0..fe71290d6 100644 --- a/src/fdb5/remote/Connection.h +++ b/src/fdb5/remote/Connection.h @@ -10,7 +10,6 @@ #pragma once -#include "eckit/serialisation/MemoryStream.h" #include "fdb5/remote/Messages.h" #include "eckit/exception/Exceptions.h" @@ -52,7 +51,7 @@ class Connection : eckit::NonCopyable { using PayloadList = std::vector; public: // methods - Connection(); + Connection() = default; virtual ~Connection() = default; @@ -82,7 +81,7 @@ class Connection : eckit::NonCopyable { virtual const eckit::net::TCPSocket& dataSocket() const = 0; protected: // members - bool single_; + bool single_ {false}; private: // members mutable std::mutex controlMutex_; diff --git a/src/fdb5/remote/Messages.cc b/src/fdb5/remote/Messages.cc index 638154b2c..4269a8039 100644 --- a/src/fdb5/remote/Messages.cc +++ b/src/fdb5/remote/Messages.cc @@ -15,10 +15,27 @@ #include "fdb5/remote/Messages.h" +#include +#include + namespace fdb5::remote { //---------------------------------------------------------------------------------------------------------------------- +Payload::Payload(const BufferStream& buffer) : length {buffer.length()}, data {buffer.data()} { } + +Payload::Payload(const std::size_t length, const void* data) : length {length}, data {data} { } + +bool Payload::empty() const { + return data == nullptr && length == 0; +} + +bool Payload::consistent() const { + return ((length == 0) ^ (data == nullptr)) == 0; +} + +//---------------------------------------------------------------------------------------------------------------------- + std::ostream& operator<<(std::ostream& s, const Message& m) { switch (m) { case Message::None: s << "None"; break; @@ -46,6 +63,7 @@ std::ostream& operator<<(std::ostream& s, const Message& m) { case Message::Store: s << "Store"; break; case Message::Axes: s << "Axes"; break; case Message::Exists: s << "Exists"; break; + case Message::Overlay: s << "Overlay"; break; // Responses case Message::Received: s << "Received"; break; diff --git a/src/fdb5/remote/Messages.h b/src/fdb5/remote/Messages.h index e8efc8271..784cbc27a 100644 --- a/src/fdb5/remote/Messages.h +++ b/src/fdb5/remote/Messages.h @@ -18,27 +18,50 @@ #pragma once +#include "eckit/io/Buffer.h" +#include "eckit/serialisation/MemoryStream.h" +#include "eckit/types/FixedString.h" + #include #include #include - -#include "eckit/types/FixedString.h" - -namespace eckit { - class Stream; -} +#include namespace fdb5::remote { //---------------------------------------------------------------------------------------------------------------------- +struct BufferStream; + struct Payload { - Payload(std::size_t length, const void* data) : length {length}, data {data} { } + Payload() = default; + + explicit Payload(const BufferStream& buffer); + + Payload(std::size_t length, const void* data); + + bool empty() const; + + /// @brief Checks if this object is in a consistent state. + /// @returns True if (length & data) is (zero & null) or (non-zero & non-null). + bool consistent() const; std::size_t length {0}; const void* data {nullptr}; }; +struct BufferStream : private eckit::Buffer, public eckit::MemoryStream { + explicit BufferStream(const size_t size) : eckit::Buffer(size), eckit::MemoryStream(data(), size) { } + + size_t length() const { return eckit::MemoryStream::position(); } + + const void* data() const { return eckit::Buffer::data(); } + + Payload payload() const { return {length(), data()}; } +}; + +//---------------------------------------------------------------------------------------------------------------------- + enum class Message : uint16_t { // Server instructions @@ -67,6 +90,7 @@ enum class Message : uint16_t { Store, Axes, Exists, + Overlay, // Responses Received = 200, @@ -86,11 +110,11 @@ std::ostream& operator<<(std::ostream& s, const Message& m); class MessageHeader { public: // types - constexpr static uint16_t currentVersion = 12; + static constexpr uint16_t currentVersion = 12; - constexpr static const auto hashBytes = 16; + static constexpr uint16_t hashBytes = 16; - constexpr static const auto markerBytes = 4; + static constexpr uint16_t markerBytes = 4; using MarkerType = eckit::FixedString; diff --git a/src/fdb5/remote/client/Client.cc b/src/fdb5/remote/client/Client.cc index 728f8ae9d..46d0fe021 100644 --- a/src/fdb5/remote/client/Client.cc +++ b/src/fdb5/remote/client/Client.cc @@ -55,35 +55,33 @@ Client::~Client() { connection_.remove(id_); } -void Client::controlWriteCheckResponse(const Message msg, - const uint32_t requestID, - const bool dataListener, - const void* const payload, - const uint32_t payloadLength) const { +//---------------------------------------------------------------------------------------------------------------------- + +void Client::controlWriteCheckResponse(const Message msg, + const uint32_t requestID, + const bool dataListener, + const Payload payload) const { ASSERT(requestID); - ASSERT(!(!payloadLength ^ !payload)); - std::lock_guard lock(blockingRequestMutex_); + ASSERT(payload.consistent()); + std::lock_guard lock(blockingRequestMutex_); PayloadList payloads; - if (payloadLength > 0) { payloads.emplace_back(payloadLength, payload); } + if (!payload.empty()) { payloads.emplace_back(payload); } auto f = connection_.controlWrite(*this, msg, requestID, dataListener, payloads); f.wait(); ASSERT(f.get().size() == 0); } -eckit::Buffer Client::controlWriteReadResponse(const Message msg, - const uint32_t requestID, - const void* const payload, - const uint32_t payloadLength) const { +eckit::Buffer Client::controlWriteReadResponse(const Message msg, const uint32_t requestID, const Payload payload) const { ASSERT(requestID); - ASSERT(!(!payloadLength ^ !payload)); - std::lock_guard lock(blockingRequestMutex_); + ASSERT(payload.consistent()); + std::lock_guard lock(blockingRequestMutex_); PayloadList payloads; - if (payloadLength > 0) { payloads.emplace_back(payloadLength, payload); } + if (!payload.empty()) { payloads.emplace_back(payload); } auto f = connection_.controlWrite(*this, msg, requestID, false, payloads); f.wait(); @@ -94,4 +92,6 @@ void Client::dataWrite(Message msg, uint32_t requestID, PayloadList payloads) { connection_.dataWrite(*this, msg, requestID, std::move(payloads)); } +//---------------------------------------------------------------------------------------------------------------------- + } // namespace fdb5::remote diff --git a/src/fdb5/remote/client/Client.h b/src/fdb5/remote/client/Client.h index dfaa5829b..36a429815 100644 --- a/src/fdb5/remote/client/Client.h +++ b/src/fdb5/remote/client/Client.h @@ -10,14 +10,18 @@ #pragma once -#include "eckit/memory/NonCopyable.h" -#include "eckit/net/Endpoint.h" - #include "fdb5/remote/Connection.h" #include "fdb5/remote/Messages.h" #include "fdb5/remote/client/ClientConnection.h" +#include "eckit/memory/NonCopyable.h" +#include "eckit/net/Endpoint.h" +#include "eckit/serialisation/MemoryStream.h" + +#include // std::size_t +#include // std::uint32_t #include +#include #include // std::pair #include @@ -45,7 +49,7 @@ class Client : eckit::NonCopyable { public: // methods Client(const eckit::net::Endpoint& endpoint, const std::string& defaultEndpoint); - Client(const EndpointList& endpoints); + explicit Client(const EndpointList& endpoints); virtual ~Client(); @@ -60,21 +64,42 @@ class Client : eckit::NonCopyable { uint32_t generateRequestID() const { return connection_.generateRequestID(); } // blocking requests + + void controlWriteCheckResponse(Message msg, uint32_t requestID, bool dataListener, Payload payload = {}) const; + + void controlWriteCheckResponse(Message msg, uint32_t requestID, bool dataListener, const BufferStream& buffer) const { + controlWriteCheckResponse(msg, requestID, dataListener, buffer.payload()); + } + void controlWriteCheckResponse(Message msg, uint32_t requestID, bool dataListener, - const void* payload = nullptr, - uint32_t payloadLength = 0) const; + const void* payload, + uint32_t payloadLength) const { + controlWriteCheckResponse(msg, requestID, dataListener, {payloadLength, payload}); + } + + [[nodiscard]] + eckit::Buffer controlWriteReadResponse(Message msg, uint32_t requestID, Payload payload = {}) const; + [[nodiscard]] + eckit::Buffer controlWriteReadResponse(Message msg, uint32_t requestID, const BufferStream& buffer) const { + return controlWriteReadResponse(msg, requestID, buffer.payload()); + } + + [[nodiscard]] eckit::Buffer controlWriteReadResponse(Message msg, uint32_t requestID, - const void* payload = nullptr, - uint32_t payloadLength = 0) const; - + const void* payload, + uint32_t payloadLength) const { + return controlWriteReadResponse(msg, requestID, {payloadLength, payload}); + } void dataWrite(Message msg, uint32_t requestID, PayloadList payloads = {}); // handlers for incoming messages - to be defined in the client class - virtual bool handle(Message message, uint32_t requestID) = 0; + + virtual bool handle(Message message, uint32_t requestID) = 0; + virtual bool handle(Message message, uint32_t requestID, eckit::Buffer&& payload) = 0; protected: @@ -89,4 +114,6 @@ class Client : eckit::NonCopyable { mutable std::mutex blockingRequestMutex_; }; +//---------------------------------------------------------------------------------------------------------------------- + } // namespace fdb5::remote diff --git a/src/fdb5/remote/client/ClientConnection.cc b/src/fdb5/remote/client/ClientConnection.cc index 2501f7b83..3ec0d0d4e 100644 --- a/src/fdb5/remote/client/ClientConnection.cc +++ b/src/fdb5/remote/client/ClientConnection.cc @@ -198,7 +198,8 @@ void ClientConnection::dataWrite(DataWriteRequest& request) const { void ClientConnection::dataWrite(Client& client, remote::Message msg, uint32_t requestID, PayloadList payloads) { - static size_t maxQueueLength = eckit::Resource("fdbDataWriteQueueLength;$FDB_DATA_WRITE_QUEUE_LENGTH", 320); + static const size_t maxQueueLength = + eckit::Resource("fdbDataWriteQueueLength;$FDB_DATA_WRITE_QUEUE_LENGTH", defaultDataWriteQueueLength); { // retrieve or add client to the list diff --git a/src/fdb5/remote/client/ClientConnection.h b/src/fdb5/remote/client/ClientConnection.h index d2e362fc6..223ade5e9 100644 --- a/src/fdb5/remote/client/ClientConnection.h +++ b/src/fdb5/remote/client/ClientConnection.h @@ -34,6 +34,8 @@ class DataWriteRequest; class ClientConnection : protected Connection { + static constexpr size_t defaultDataWriteQueueLength = 320; + public: // methods ~ClientConnection() override; diff --git a/src/fdb5/remote/client/RemoteCatalogue.cc b/src/fdb5/remote/client/RemoteCatalogue.cc index 319ae9c97..ac3f964bb 100644 --- a/src/fdb5/remote/client/RemoteCatalogue.cc +++ b/src/fdb5/remote/client/RemoteCatalogue.cc @@ -10,6 +10,7 @@ #include "fdb5/remote/client/RemoteCatalogue.h" +#include "eckit/exception/Exceptions.h" #include "fdb5/LibFdb5.h" #include "fdb5/database/Key.h" #include "fdb5/remote/Messages.h" @@ -36,14 +37,13 @@ Schema* fetchSchema(const Key& dbKey, const RemoteCatalogue& catalogue) { LOG_DEBUG_LIB(LibFdb5) << "Fetching schema from remote catalogue: " << catalogue.controlEndpoint() << std::endl; // send dbkey to remote - eckit::Buffer keyBuffer(RemoteCatalogue::defaultBufferSizeKey); - eckit::MemoryStream keyStream(keyBuffer); - keyStream << dbKey; + BufferStream keyBuffer(RemoteCatalogue::defaultBufferSizeKey); + keyBuffer << dbKey; const auto requestID = catalogue.generateRequestID(); // receive schema from remote - auto recvBuf = catalogue.controlWriteReadResponse(Message::Schema, requestID, keyBuffer, keyStream.position()); + auto recvBuf = catalogue.controlWriteReadResponse(Message::Schema, requestID, keyBuffer); eckit::MemoryStream schemaStream(recvBuf); return eckit::Reanimator::reanimate(schemaStream); @@ -79,14 +79,13 @@ void RemoteCatalogue::archive(const Key& idxKey, const Key& datumKey, std::share numLocations_++; } - Buffer buffer(defaultBufferSizeArchive); - MemoryStream stream(buffer); - stream << idxKey; - stream << datumKey; - stream << *fieldLocation; + BufferStream buffer(defaultBufferSizeArchive); + buffer << idxKey; + buffer << datumKey; + buffer << *fieldLocation; PayloadList payloads; - payloads.emplace_back(stream.position(), buffer.data()); + payloads.emplace_back(buffer); dataWrite(Message::Blob, id, payloads); @@ -125,14 +124,13 @@ void RemoteCatalogue::flush(size_t archivedFields) { // Flush only does anything if there is an ongoing archive(); if (numLocations_ > 0) { - eckit::Buffer sendBuf(defaultBufferSizeFlush); - eckit::MemoryStream s(sendBuf); - s << numLocations_; + BufferStream sendBuf(defaultBufferSizeFlush); + sendBuf << numLocations_; LOG_DEBUG_LIB(LibFdb5) << " RemoteCatalogue::flush - flushing " << numLocations_ << " fields" << std::endl; // The flush call is blocking - controlWriteCheckResponse(Message::Flush, generateRequestID(), false, sendBuf, s.position()); + controlWriteCheckResponse(Message::Flush, generateRequestID(), false, sendBuf); numLocations_ = 0; } @@ -146,11 +144,10 @@ bool RemoteCatalogue::exists() const { bool result = false; - eckit::Buffer sendBuf(defaultBufferSizeKey); - eckit::MemoryStream sms(sendBuf); - sms << dbKey_; + BufferStream sendBuf(defaultBufferSizeKey); + sendBuf << dbKey_; - auto recvBuf = controlWriteReadResponse(Message::Exists, generateRequestID(), sendBuf, sms.position()); + auto recvBuf = controlWriteReadResponse(Message::Exists, generateRequestID(), sendBuf); eckit::MemoryStream rms(recvBuf); rms >> result; @@ -182,7 +179,27 @@ bool RemoteCatalogue::handle(Message message, uint32_t requestID, eckit::Buffer& return false; } -void RemoteCatalogue::overlayDB(const Catalogue& otherCatalogue, const std::set& variableKeys, bool unmount) {NOTIMP;} +void RemoteCatalogue::overlayDB(const Catalogue* srcCatalogue, const eckit::StringSet& variableKeys, bool unmount) { + + const auto* source = dynamic_cast(srcCatalogue); + + if (!source) { throw eckit::UserError("Cannot overlay this DB with a non-remote source catalogue!", Here()); } + + if (source->controlEndpoint() != controlEndpoint()) { + throw eckit::UserError("Cannot overlay DBs from different servers!", Here()); + } + + LOG_DEBUG_LIB(LibFdb5) << "RemoteCatalogue::overlayDB srcDB:" << *srcCatalogue << " dbKey: " << dbKey_ << std::endl; + + BufferStream sendBuf(3 * defaultBufferSizeKey); + sendBuf << source->key(); + sendBuf << dbKey_; + sendBuf << variableKeys; + sendBuf << unmount; + + controlWriteCheckResponse(Message::Overlay, generateRequestID(), false, sendBuf); +} + void RemoteCatalogue::index(const Key& key, const eckit::URI& uri, eckit::Offset offset, eckit::Length length) {NOTIMP;} void RemoteCatalogue::reconsolidate(){NOTIMP;} std::vector RemoteCatalogue::metadataPaths() const {NOTIMP;} diff --git a/src/fdb5/remote/client/RemoteCatalogue.h b/src/fdb5/remote/client/RemoteCatalogue.h index dca8b7e84..f12d59bfc 100644 --- a/src/fdb5/remote/client/RemoteCatalogue.h +++ b/src/fdb5/remote/client/RemoteCatalogue.h @@ -36,7 +36,9 @@ class RemoteCatalogue : public CatalogueReader, public CatalogueWriter, public C // From CatalogueWriter const Index& currentIndex() override; void archive(const Key& idxKey, const Key& datumKey, std::shared_ptr fieldLocation) override; - void overlayDB(const Catalogue& otherCatalogue, const std::set& variableKeys, bool unmount) override; + + void overlayDB(const Catalogue* srcCatalogue, const eckit::StringSet& variableKeys, bool unmount) override; + void index(const Key& key, const eckit::URI& uri, eckit::Offset offset, eckit::Length length) override; void reconsolidate() override; diff --git a/src/fdb5/remote/client/RemoteStore.cc b/src/fdb5/remote/client/RemoteStore.cc index ea340595a..8c09f61b7 100644 --- a/src/fdb5/remote/client/RemoteStore.cc +++ b/src/fdb5/remote/client/RemoteStore.cc @@ -247,11 +247,10 @@ bool RemoteStore::exists() const { bool result = false; - eckit::Buffer sendBuf(defaultBufferSizeKey); - eckit::MemoryStream sms(sendBuf); - sms << dbKey_; + BufferStream sendBuf(defaultBufferSizeKey); + sendBuf << dbKey_; - auto recvBuf = controlWriteReadResponse(Message::Exists, generateRequestID(), sendBuf, sms.position()); + auto recvBuf = controlWriteReadResponse(Message::Exists, generateRequestID(), sendBuf); eckit::MemoryStream rms(recvBuf); rms >> result; @@ -279,13 +278,12 @@ void RemoteStore::archive(const Key& key, const void *data, eckit::Length length // store the callback, associated with the request id - to be done BEFORE sending the data locations_.archive(id, catalogue_archive); - eckit::Buffer keyBuffer(defaultBufferSizeKey); - eckit::MemoryStream keyStream(keyBuffer); - keyStream << dbKey_; - keyStream << key; + BufferStream keyBuffer(defaultBufferSizeKey); + keyBuffer << dbKey_; + keyBuffer << key; PayloadList payloads; - payloads.emplace_back(keyStream.position(), keyBuffer.data()); + payloads.emplace_back(keyBuffer); payloads.emplace_back(length, data); dataWrite(Message::Blob, id, payloads); @@ -387,14 +385,10 @@ bool RemoteStore::handle(Message message, uint32_t requestID, eckit::Buffer&& pa switch (message) { case Message::Store: { // received a Field location from the remote store, can forward to the archiver for the indexing - MemoryStream s(payload); + eckit::MemoryStream s(payload); std::unique_ptr location(eckit::Reanimator::reanimate(s)); - if (defaultEndpoint().empty()) { - return locations_.location(requestID, std::move(location)); - } else { - std::unique_ptr remoteLocation = std::unique_ptr(new RemoteFieldLocation(eckit::net::Endpoint{defaultEndpoint()}, *location)); - return locations_.location(requestID, std::move(remoteLocation)); - } + if (defaultEndpoint().empty()) { return locations_.location(requestID, std::move(location)); } + return locations_.location(requestID, std::make_unique(defaultEndpoint(), *location)); } case Message::Blob: { auto it = messageQueues_.find(requestID); @@ -435,25 +429,23 @@ eckit::DataHandle* RemoteStore::dataHandle(const FieldLocation& fieldLocation) { eckit::DataHandle* RemoteStore::dataHandle(const FieldLocation& fieldLocation, const Key& remapKey) { - Buffer encodeBuffer(4096); - MemoryStream s(encodeBuffer); - s << fieldLocation; - s << remapKey; + BufferStream encodeBuffer(defaultEncodeBufferSize); + encodeBuffer << fieldLocation; + encodeBuffer << remapKey; uint32_t id = generateRequestID(); - static size_t queueSize = 320; std::shared_ptr queue = nullptr; { std::lock_guard lock(retrieveMessageMutex_); - auto entry = retrieveMessageQueues_.emplace(id, std::make_shared(queueSize)); + auto entry = retrieveMessageQueues_.emplace(id, std::make_shared(defaultRetrieveQueueLength)); ASSERT(entry.second); queue = entry.first->second; } - controlWriteCheckResponse(fdb5::remote::Message::Read, id, true, encodeBuffer, s.position()); + controlWriteCheckResponse(fdb5::remote::Message::Read, id, true, encodeBuffer); return new FDBRemoteDataHandle(id, fieldLocation.length(), queue, controlEndpoint()); } diff --git a/src/fdb5/remote/client/RemoteStore.h b/src/fdb5/remote/client/RemoteStore.h index 50b7e5f96..356d3fa1d 100644 --- a/src/fdb5/remote/client/RemoteStore.h +++ b/src/fdb5/remote/client/RemoteStore.h @@ -108,6 +108,9 @@ class RemoteStore : public Store, public Client { using StoredMessage = std::pair; using MessageQueue = eckit::Queue; + static constexpr size_t defaultEncodeBufferSize = 4096; + static constexpr size_t defaultRetrieveQueueLength = 320; + static const char* typeName() { return "remote"; } public: // methods diff --git a/src/fdb5/remote/server/CatalogueHandler.cc b/src/fdb5/remote/server/CatalogueHandler.cc index 75ac5c01a..57d2be59a 100644 --- a/src/fdb5/remote/server/CatalogueHandler.cc +++ b/src/fdb5/remote/server/CatalogueHandler.cc @@ -9,6 +9,8 @@ */ #include "fdb5/remote/server/CatalogueHandler.h" +#include "eckit/log/Log.h" +#include "eckit/types/Types.h" #include "fdb5/LibFdb5.h" #include "fdb5/api/helpers/FDBToolRequest.h" #include "fdb5/database/Catalogue.h" @@ -21,6 +23,7 @@ #include "eckit/net/TCPSocket.h" #include "eckit/serialisation/MemoryStream.h" +#include // uint32_t #include #include #include @@ -29,6 +32,8 @@ using namespace eckit; namespace fdb5::remote { +//---------------------------------------------------------------------------------------------------------------------- + // *************************************************************************************** // // Note that we use the "control" and "data" connections in a specific way, although these @@ -40,8 +45,6 @@ namespace fdb5::remote { CatalogueHandler::CatalogueHandler(eckit::net::TCPSocket& socket, const Config& config): ServerConnection(socket, config), fdbControlConnection_(false), fdbDataConnection_(false) {} -CatalogueHandler::~CatalogueHandler() {} - Handled CatalogueHandler::handleControl(Message message, uint32_t clientID, uint32_t requestID) { try { @@ -123,6 +126,10 @@ Handled CatalogueHandler::handleControl(Message message, uint32_t clientID, uint exists(clientID, requestID, std::move(payload)); return Handled::Replied; + case Message::Overlay: // overlay catalogue + overlay(clientID, requestID, std::move(payload)); + return Handled::Yes; + default: { std::stringstream ss; ss << "ERROR: Unexpected message recieved (" << message << "). ABORTING"; @@ -142,6 +149,7 @@ Handled CatalogueHandler::handleControl(Message message, uint32_t clientID, uint return Handled::No; } +//---------------------------------------------------------------------------------------------------------------------- // API forwarding logic, adapted from original remoteHandler // Used for Inspect and List @@ -179,7 +187,7 @@ struct ListHelper : public BaseHelper { }; struct AxesHelper : public BaseHelper { - virtual size_t encodeBufferSize(const AxesElement& el) const { return el.encodeSize(); } + size_t encodeBufferSize(const AxesElement& el) const override { return el.encodeSize(); } void extraDecode(eckit::Stream& s) { s >> level_; @@ -254,6 +262,8 @@ void CatalogueHandler::forwardApiCall(uint32_t clientID, uint32_t requestID, eck })); } +//---------------------------------------------------------------------------------------------------------------------- + void CatalogueHandler::list(uint32_t clientID, uint32_t requestID, eckit::Buffer&& payload) { forwardApiCall(clientID, requestID, std::move(payload)); } @@ -366,11 +376,34 @@ void CatalogueHandler::exists(uint32_t clientID, uint32_t requestID, eckit::Buff exists = CatalogueReaderFactory::instance().build(dbKey, config_)->exists(); } - eckit::Buffer existBuf(5); - eckit::MemoryStream stream(existBuf); - stream << exists; + BufferStream existsBuf(5); // 5 bytes (4-byte int + 1-byte tag) + existsBuf << exists; + // reply to the client + write(Message::Received, true, clientID, requestID, {existsBuf.payload()}); +} + +void CatalogueHandler::overlay(uint32_t /*clientID*/, uint32_t /*requestID*/, eckit::Buffer&& payload) const { + + ASSERT(payload.size() > 0); + + eckit::MemoryStream stream(payload); + + const Key srcKey(stream); + const Key tgtKey(stream); + + eckit::StringSet variableKeys; + stream >> variableKeys; - write(Message::Received, true, clientID, requestID, existBuf.data(), stream.position()); + bool unmount = false; + stream >> unmount; + + auto srcDB = CatalogueReaderFactory::instance().build(srcKey, config_); + auto tgtDB = CatalogueWriterFactory::instance().build(tgtKey, config_); + + tgtDB->overlayDB(srcDB.get(), variableKeys, unmount); + + eckit::Log::info() << "Overlay completed!" << std::endl; + eckit::Log::status() << "Overlay completed!" << std::endl; } void CatalogueHandler::flush(uint32_t clientID, uint32_t requestID, eckit::Buffer&& payload) { @@ -487,4 +520,6 @@ CatalogueWriter& CatalogueHandler::catalogue(uint32_t id, const Key& dbKey) { return *((catalogues_.emplace(id, CatalogueArchiver(!single_, dbKey, config_)).first)->second.catalogue); } +//---------------------------------------------------------------------------------------------------------------------- + } // namespace fdb5::remote diff --git a/src/fdb5/remote/server/CatalogueHandler.h b/src/fdb5/remote/server/CatalogueHandler.h index 9d10888af..b0d2e3bab 100644 --- a/src/fdb5/remote/server/CatalogueHandler.h +++ b/src/fdb5/remote/server/CatalogueHandler.h @@ -38,10 +38,9 @@ struct CatalogueArchiver { //---------------------------------------------------------------------------------------------------------------------- class CatalogueHandler : public ServerConnection { -public: // methods +public: // methods CatalogueHandler(eckit::net::TCPSocket& socket, const Config& config); - ~CatalogueHandler() override; private: // methods @@ -60,6 +59,7 @@ class CatalogueHandler : public ServerConnection { void schema(uint32_t clientID, uint32_t requestID, eckit::Buffer&& payload); void stores(uint32_t clientID, uint32_t requestID); void exists(uint32_t clientID, uint32_t requestID, eckit::Buffer&& payload) const; + void overlay(uint32_t clientID, uint32_t requestID, eckit::Buffer&& payload) const; void archiveBlob(const uint32_t clientID, const uint32_t requestID, const void* data, size_t length) override; diff --git a/src/fdb5/remote/server/ServerConnection.cc b/src/fdb5/remote/server/ServerConnection.cc index 85d24e25b..ad89878dc 100644 --- a/src/fdb5/remote/server/ServerConnection.cc +++ b/src/fdb5/remote/server/ServerConnection.cc @@ -55,9 +55,6 @@ namespace fdb5::remote { namespace { -constexpr const auto defaultRetrieveQueueSize = 10000; -constexpr const auto defaultArchiveQueueSize = 320; - std::vector intersection(const eckit::LocalConfiguration& c1, const eckit::LocalConfiguration& c2, const std::string& field){ std::vector v1 = c1.getIntVector(field); @@ -80,8 +77,8 @@ std::vector intersection(const eckit::LocalConfiguration& c1, const eckit:: ServerConnection::ServerConnection(eckit::net::TCPSocket& socket, const Config& config) : config_(config), dataListenHostname_(config.getString("dataListenHostname", "")), - readLocationQueue_(eckit::Resource("fdbRetrieveQueueSize", defaultRetrieveQueueSize)), - archiveQueue_(eckit::Resource("fdbServerMaxQueueSize", defaultArchiveQueueSize)), + readLocationQueue_(eckit::Resource("fdbRetrieveQueueSize", defaultRetrieveQueueLength)), + archiveQueue_(eckit::Resource("fdbServerMaxQueueSize", defaultArchiveQueueLength)), controlSocket_(socket) { LOG_DEBUG_LIB(LibFdb5) << "ServerConnection::ServerConnection initialized" << std::endl; diff --git a/src/fdb5/remote/server/ServerConnection.h b/src/fdb5/remote/server/ServerConnection.h index 6a4ba27e6..589cedf07 100644 --- a/src/fdb5/remote/server/ServerConnection.h +++ b/src/fdb5/remote/server/ServerConnection.h @@ -84,7 +84,14 @@ struct ArchiveElem { clientID_(clientID), requestID_(requestID), payload_(std::move(payload)), multiblob_(multiblob) {} }; +//---------------------------------------------------------------------------------------------------------------------- + class ServerConnection : public Connection, public Handler { + + static constexpr size_t defaultRetrieveQueueLength = 320; + + static constexpr size_t defaultArchiveQueueLength = 10000; + public: // methods ServerConnection(eckit::net::TCPSocket& socket, const Config& config); ~ServerConnection() override; diff --git a/src/fdb5/remote/server/StoreHandler.cc b/src/fdb5/remote/server/StoreHandler.cc index c0c290e47..b7e8f5d4d 100644 --- a/src/fdb5/remote/server/StoreHandler.cc +++ b/src/fdb5/remote/server/StoreHandler.cc @@ -290,11 +290,11 @@ void StoreHandler::exists(const uint32_t clientID, const uint32_t requestID, con exists = StoreFactory::instance().build(dbKey, config_)->exists(); } - eckit::Buffer existBuf(5); - eckit::MemoryStream stream(existBuf); - stream << exists; + BufferStream existsBuf(5); + existsBuf << exists; - write(Message::Received, true, clientID, requestID, existBuf.data(), stream.position()); + // reply to the client + write(Message::Received, true, clientID, requestID, {existsBuf.payload()}); } //---------------------------------------------------------------------------------------------------------------------- diff --git a/src/fdb5/toc/TocCatalogue.cc b/src/fdb5/toc/TocCatalogue.cc index 5fa12a5d0..cc73e0f29 100644 --- a/src/fdb5/toc/TocCatalogue.cc +++ b/src/fdb5/toc/TocCatalogue.cc @@ -14,10 +14,11 @@ #include "fdb5/rules/Rule.h" #include "fdb5/toc/RootManager.h" #include "fdb5/toc/TocCatalogue.h" +#include "fdb5/toc/TocEngine.h" +#include "fdb5/toc/TocMoveVisitor.h" #include "fdb5/toc/TocPurgeVisitor.h" #include "fdb5/toc/TocStats.h" #include "fdb5/toc/TocWipeVisitor.h" -#include "fdb5/toc/TocMoveVisitor.h" using namespace eckit; diff --git a/src/fdb5/toc/TocCatalogue.h b/src/fdb5/toc/TocCatalogue.h index 9088853bf..db8b032fb 100644 --- a/src/fdb5/toc/TocCatalogue.h +++ b/src/fdb5/toc/TocCatalogue.h @@ -21,7 +21,6 @@ #include "fdb5/rules/Schema.h" #include "fdb5/toc/FileSpace.h" #include "fdb5/toc/TocHandler.h" -#include "fdb5/toc/TocEngine.h" namespace fdb5 { @@ -45,6 +44,8 @@ class TocCatalogue : public CatalogueImpl, public TocHandler { bool enabled(const ControlIdentifier& controlIdentifier) const override; + bool exists() const override; + public: // constants static const std::string DUMP_PARAM_WALKSUBTOC; @@ -60,7 +61,7 @@ class TocCatalogue : public CatalogueImpl, public TocHandler { std::vector* remapKeys = nullptr) const; void checkUID() const override; - bool exists() const override; + void dump(std::ostream& out, bool simple, const eckit::Configuration& conf) const override; std::vector metadataPaths() const override; const Schema& schema() const override; diff --git a/src/fdb5/toc/TocCatalogueWriter.cc b/src/fdb5/toc/TocCatalogueWriter.cc index 0099af0b2..19a43108d 100644 --- a/src/fdb5/toc/TocCatalogueWriter.cc +++ b/src/fdb5/toc/TocCatalogueWriter.cc @@ -8,21 +8,20 @@ * does it submit to any jurisdiction. */ -#include "fdb5/fdb5_config.h" - -#include "eckit/config/Resource.h" +#include "eckit/exception/Exceptions.h" +#include "eckit/filesystem/PathName.h" #include "eckit/log/Log.h" -#include "eckit/log/Bytes.h" -#include "eckit/io/EmptyHandle.h" +#include "eckit/types/Types.h" -#include "fdb5/database/EntryVisitMechanism.h" -#include "fdb5/io/FDBFileHandle.h" #include "fdb5/LibFdb5.h" +#include "fdb5/database/EntryVisitMechanism.h" +#include "fdb5/io/LustreSettings.h" +#include "fdb5/toc/RootManager.h" +#include "fdb5/toc/TocCatalogue.h" #include "fdb5/toc/TocCatalogueWriter.h" #include "fdb5/toc/TocFieldLocation.h" #include "fdb5/toc/TocIndex.h" -#include "fdb5/toc/RootManager.h" -#include "fdb5/io/LustreSettings.h" +#include using namespace eckit; @@ -240,34 +239,49 @@ const TocSerialisationVersion& TocCatalogueWriter::serialisationVersion() const return TocHandler::serialisationVersion(); } -void TocCatalogueWriter::overlayDB(const Catalogue& otherCat, const std::set& variableKeys, bool unmount) { +void TocCatalogueWriter::overlayDB(const Catalogue* srcCatalogue, const eckit::StringSet& variableKeys, bool unmount) { + + const auto* source = dynamic_cast(srcCatalogue); - const TocCatalogue& otherCatalogue = dynamic_cast(otherCat); - const Key& otherKey(otherCatalogue.key()); + if (!source) { throw eckit::UserError("Cannot overlay toc DB with a non-toc source DB!", Here()); } - if (otherKey.size() != TocCatalogue::dbKey_.size()) { - std::stringstream ss; - ss << "Keys insufficiently matching for mount: " << TocCatalogue::dbKey_ << " : " << otherKey; - throw UserError(ss.str(), Here()); + if (!source->exists()) { + std::ostringstream oss; + oss << "Source database is not found: " << *source << std::endl; + throw eckit::UserError(oss.str(), Here()); } - // Build the difference map from the old to the new key + if (source->uri() == uri()) { + std::ostringstream oss; + oss << "Source and target cannot be same! " << source->uri() << " : " << uri(); + throw eckit::UserError(oss.str(), Here()); + } - for (const auto& kv : TocCatalogue::dbKey_) { + const auto& srcKey = source->key(); - auto it = otherKey.find(kv.first); - if (it == otherKey.end()) { - std::stringstream ss; - ss << "Keys insufficiently matching for mount: " << TocCatalogue::dbKey_ << " : " << otherKey; - throw UserError(ss.str(), Here()); + // check keywords are matching + if (srcKey.keys() != dbKey_.keys()) { + std::ostringstream oss; + oss << "Keys are not matching! " << srcKey << " : " << dbKey_ << std::endl; + throw eckit::UserError(oss.str(), Here()); + } + + // check values are as expected + for (const auto& [keyword, value] : dbKey_) { + + const auto isDifferent = srcKey.find(keyword)->second != value; + const auto isVariable = variableKeys.find(keyword) != variableKeys.end(); + + if (isDifferent && !isVariable) { + std::ostringstream oss; + oss << "Keyword [" << keyword << "] must not differ between DBs: " << srcKey << " : " << dbKey_; + throw eckit::UserError(oss.str(), Here()); } - if (kv.second != it->second) { - if (variableKeys.find(kv.first) == variableKeys.end()) { - std::stringstream ss; - ss << "Key " << kv.first << " not allowed to differ between DBs: " << TocCatalogue::dbKey_ << " : " << otherKey; - throw UserError(ss.str(), Here()); - } + if (isVariable && !isDifferent) { + std::ostringstream oss; + oss << "Variable Keyword [" << keyword << "] must differ between DBs: " << srcKey << " : " << dbKey_; + throw eckit::UserError(oss.str(), Here()); } } @@ -276,19 +290,19 @@ void TocCatalogueWriter::overlayDB(const Catalogue& otherCat, const std::set subtocs; + eckit::StringSet subtocs; loadIndexes(false, &subtocs); - eckit::PathName stPath(otherCatalogue.tocPath()); + eckit::PathName stPath(source->tocPath()); if (subtocs.find(stPath) == subtocs.end()) { std::stringstream ss; - ss << "Cannot unmount DB: " << otherCatalogue << ". Not currently mounted"; - throw UserError(ss.str(), Here()); + ss << "Cannot unmount source DB [" << source << "] that is already unmounted!"; + throw eckit::UserError(ss.str(), Here()); } - writeSubTocMaskRecord(otherCatalogue); + writeSubTocMaskRecord(*source); } else { - writeSubTocRecord(otherCatalogue); + writeSubTocRecord(*source); } } @@ -423,4 +437,4 @@ static CatalogueWriterBuilder builder("toc"); //---------------------------------------------------------------------------------------------------------------------- -} // namespace fdb5 +} // namespace fdb5 diff --git a/src/fdb5/toc/TocCatalogueWriter.h b/src/fdb5/toc/TocCatalogueWriter.h index d428e69ad..c42833f43 100644 --- a/src/fdb5/toc/TocCatalogueWriter.h +++ b/src/fdb5/toc/TocCatalogueWriter.h @@ -19,8 +19,6 @@ #include "eckit/os/AutoUmask.h" #include "fdb5/database/Index.h" -#include "fdb5/toc/TocRecord.h" - #include "fdb5/toc/TocCatalogue.h" #include "fdb5/toc/TocSerialisationVersion.h" @@ -50,7 +48,7 @@ class TocCatalogueWriter : public TocCatalogue, public CatalogueWriter { /// Mount an existing TocCatalogue, which has a different metadata key (within /// constraints) to allow on-line rebadging of data /// variableKeys: The keys that are allowed to differ between the two DBs - void overlayDB(const Catalogue& otherCatalogue, const std::set& variableKeys, bool unmount) override; + void overlayDB(const Catalogue* srcCatalogue, const eckit::StringSet& variableKeys, bool unmount) override; // Hide the contents of the DB!!! void hideContents() override; diff --git a/src/fdb5/tools/fdb-overlay.cc b/src/fdb5/tools/fdb-overlay.cc index 900364827..b0e97b2c5 100644 --- a/src/fdb5/tools/fdb-overlay.cc +++ b/src/fdb5/tools/fdb-overlay.cc @@ -8,56 +8,56 @@ * does it submit to any jurisdiction. */ -#include "eckit/option/CmdArgs.h" -#include "eckit/option/SimpleOption.h" -#include "eckit/option/VectorOption.h" - +#include "fdb5/LibFdb5.h" +#include "fdb5/api/FDB.h" #include "fdb5/api/helpers/FDBToolRequest.h" #include "fdb5/config/Config.h" +#include "fdb5/database/Catalogue.h" #include "fdb5/database/Key.h" -#include "fdb5/LibFdb5.h" #include "fdb5/rules/Schema.h" -#include "fdb5/toc/TocEngine.h" #include "fdb5/tools/FDBTool.h" +#include "eckit/exception/Exceptions.h" +#include "eckit/log/Log.h" +#include "eckit/option/CmdArgs.h" +#include "eckit/option/SimpleOption.h" +#include "eckit/option/VectorOption.h" +#include "eckit/types/Types.h" + +#include +#include +#include + using namespace eckit; using namespace eckit::option; -namespace fdb5 { -namespace tools { +namespace fdb5::tools { //---------------------------------------------------------------------------------------------------------------------- class FdbOverlay : public FDBTool { -public: // methods - - FdbOverlay(int argc, char **argv) : - FDBTool(argc, argv), - variableKeys_{"class", "expver"}, - remove_(false), - force_(false) { +public: // methods + FdbOverlay(int argc, char** argv) : FDBTool(argc, argv) { options_.push_back(new VectorOption("variable-keys", - "The keys that may vary between mounted DBs", - 0, ",")); - options_.push_back(new SimpleOption("remove", "Remove a previously FDB overlay")); - options_.push_back(new SimpleOption("force", "Apply overlay even if target already exists")); + "The keywords that should vary between mounted DBs", 0, ",")); + options_.push_back(new SimpleOption("remove", "Remove/unmount existing FDB overlay")); + options_.push_back(new SimpleOption("force", "Apply overlay even if target DB already exists")); } -private: // methods - - virtual void init(const option::CmdArgs& args); - virtual void execute(const option::CmdArgs& args); - virtual void usage(const std::string &tool) const; +private: // methods + void init(const option::CmdArgs& args) override; + void execute(const option::CmdArgs& args) override; + void usage(const std::string& tool) const override; -private: // members +private: // members + eckit::StringSet variableKeys_ {"class", "expver"}; - std::vector variableKeys_; - bool remove_; - bool force_; + bool remove_ {false}; + bool force_ {false}; }; -void FdbOverlay::usage(const std::string &tool) const { +void FdbOverlay::usage(const std::string& tool) const { Log::info() << std::endl << "Usage: " << tool << " [options] [source DB request] [target DB request]" << std::endl @@ -68,9 +68,13 @@ void FdbOverlay::usage(const std::string &tool) const { void FdbOverlay::init(const option::CmdArgs& args) { FDBTool::init(args); - args.get("variable-keys", variableKeys_); - remove_ = args.getBool("remove", false); - force_ = args.getBool("force", false); + { + eckit::StringList keys; + args.get("variable-keys", keys); + variableKeys_ = {keys.begin(), keys.end()}; + } + remove_ = args.getBool("remove", remove_); + force_ = args.getBool("force", force_); } void FdbOverlay::execute(const option::CmdArgs& args) { @@ -90,82 +94,63 @@ void FdbOverlay::execute(const option::CmdArgs& args) { ASSERT(!sourceRequest.all()); ASSERT(!targetRequest.all()); - Config conf = config(args); - const Schema& schema = conf.schema(); + ASSERT(sourceRequest.request().count() == targetRequest.request().count()); - TypedKey source{conf.schema().registry()}; - TypedKey target{conf.schema().registry()}; - ASSERT(schema.expandFirstLevel(sourceRequest.request(), source)); - ASSERT(schema.expandFirstLevel(targetRequest.request(), target)); + //------------------------------------------------------------------------------------------------------------------ - if (remove_) { - Log::info() << "Removing " << source << " from " << target << std::endl; - } else { - Log::info() << "Applying " << source << " onto " << target << std::endl; - } + FDB fdb(config(args)); - if (source.keys() != target.keys()) { - std::stringstream ss; - ss << "Keys insufficiently matching for mount: " << source << " : " << target << std::endl; - throw UserError(ss.str(), Here()); - } + const auto& conf = fdb.config(); - std::set vkeys(variableKeys_.begin(), variableKeys_.end()); - for (const auto& kv : target) { - auto it = source.find(kv.first); - ASSERT(it != source.end()); - if (kv.second != it->second && vkeys.find(kv.first) == vkeys.end()) { - std::stringstream ss; - ss << "Key " << kv.first << " not allowed to differ between DBs: " << source << " : " << target; - throw UserError(ss.str(), Here()); - } - } + const auto& schema = conf.schema(); - std::unique_ptr dbSource = CatalogueReaderFactory::instance().build(source.canonical(), conf); - if (!dbSource->exists()) { - std::stringstream ss; - ss << "Source database not found: " << source << std::endl; - throw UserError(ss.str(), Here()); - } + TypedKey srcKey {schema.registry()}; + TypedKey tgtKey {schema.registry()}; - if (dbSource->type() != TocEngine::typeName()) { - std::stringstream ss; - ss << "Only TOC DBs currently supported" << std::endl; - throw UserError(ss.str(), Here()); - } - - std::unique_ptr dbTarget = CatalogueReaderFactory::instance().build(target.canonical(), conf); + ASSERT(schema.expandFirstLevel(sourceRequest.request(), srcKey)); + ASSERT(schema.expandFirstLevel(targetRequest.request(), tgtKey)); if (remove_) { - if (!dbTarget->exists()) { - std::stringstream ss; - ss << "Target database must already exist: " << target << std::endl; - throw UserError(ss.str(), Here()); - } + Log::info() << "Removing " << srcKey << " from " << tgtKey << std::endl; } else { - if (dbTarget->exists() && !force_) { - std::stringstream ss; - ss << "Target database already exists: " << target << std::endl; - eckit::Log::error() << ss.str() << std::endl; - eckit::Log::error() << "To mount to existing target, rerun with --force" << std::endl; - throw UserError(ss.str(), Here()); + Log::info() << "Applying " << srcKey << " onto " << tgtKey << std::endl; + } + + //------------------------------------------------------------------------------------------------------------------ + + { + auto dbTarget = CatalogueReaderFactory::instance().build(tgtKey.canonical(), conf); + + const auto& exists = dbTarget->exists(); + + if (remove_) { + if (!exists) { + std::ostringstream oss; + oss << "The '--remove' option expects an existing target database! Key: " << tgtKey << std::endl; + throw eckit::UserError(oss.str(), Here()); + } + } else { + if (exists && !force_) { + std::ostringstream oss; + oss << "The target database already exists! Key: " << tgtKey << std::endl; + eckit::Log::error() << oss.str() << std::endl; + eckit::Log::error() << "To overlay an existing target database, re-run with `--force`" << std::endl; + throw eckit::UserError(oss.str(), Here()); + } } } - ASSERT(dbTarget->uri() != dbSource->uri()); + auto dbTarget = CatalogueWriterFactory::instance().build(tgtKey.canonical(), conf); + auto dbSource = CatalogueReaderFactory::instance().build(srcKey.canonical(), conf); - std::unique_ptr newCatalogue = CatalogueWriterFactory::instance().build(target.canonical(), conf); - if (newCatalogue->type() == TocEngine::typeName() && dbSource->type() == TocEngine::typeName()) { - newCatalogue->overlayDB(*dbSource, vkeys, remove_); - } + dbTarget->overlayDB(dbSource.get(), variableKeys_, remove_); } //---------------------------------------------------------------------------------------------------------------------- -} // namespace tools -} // namespace fbb5 +} // namespace fdb5::tools -int main(int argc, char **argv) { +int main(int argc, char** argv) { fdb5::tools::FdbOverlay app(argc, argv); return app.start(); }