From f0ce6ef4776836661f950cb468cc5ea018148f4e Mon Sep 17 00:00:00 2001 From: Nick Date: Fri, 4 Oct 2019 21:27:42 -0400 Subject: [PATCH] Updated source code. Removed dependency of boost. Added better naming convention according to the C++ core guidelines. Fixed some APIs. Added source folder. More changes eventually will come. --- .gitignore | 9 +- LICENSE | 1 + README.md | 157 ++--- examples/CMakeLists.txt | 30 - examples/async_basic_pub-sub.cpp | 52 ++ examples/async_pubsub.cpp | 177 ------ examples/async_pubsub2.cpp | 79 --- examples/async_set_get.cpp | 84 --- examples/async_set_get2.cpp | 51 -- examples/async_timeout.cpp | 110 ---- examples/benchmark.cpp | 245 -------- examples/sync_basic_get-set.cpp | 54 ++ examples/sync_benchmark.cpp | 133 ----- examples/sync_pipeline.cpp | 50 -- examples/sync_set_get.cpp | 48 -- examples/sync_timeout.cpp | 51 -- include/redis/async_client.hpp | 99 ++++ include/redis/base_client.hpp | 127 ++++ include/redis/buffer.hpp | 53 ++ include/redis/config.hpp | 27 + include/redis/parser.hpp | 82 +++ include/redis/sync_client.hpp | 55 ++ include/redis/value.hpp | 102 ++++ include/redis/version.hpp | 6 + src/CMakeLists.txt | 1 - src/async_client.cpp | 121 ++++ src/base_client.cpp | 317 ++++++++++ src/parser.cpp | 389 +++++++++++++ src/redisclient/CMakeLists.txt | 49 -- src/redisclient/config.h | 34 -- src/redisclient/impl/pipeline.cpp | 40 -- src/redisclient/impl/redisasyncclient.cpp | 200 ------- src/redisclient/impl/redisclientimpl.cpp | 674 ---------------------- src/redisclient/impl/redisclientimpl.h | 153 ----- src/redisclient/impl/redisparser.cpp | 432 -------------- src/redisclient/impl/redissyncclient.cpp | 321 ----------- src/redisclient/impl/redisvalue.cpp | 198 ------- src/redisclient/impl/throwerror.h | 28 - src/redisclient/pipeline.h | 58 -- src/redisclient/redisasyncclient.h | 117 ---- src/redisclient/redisbuffer.h | 64 -- src/redisclient/redisparser.h | 95 --- src/redisclient/redissyncclient.h | 109 ---- src/redisclient/redisvalue.h | 126 ---- src/redisclient/version.h | 11 - src/sync_client.cpp | 87 +++ src/value.cpp | 124 ++++ 47 files changed, 1746 insertions(+), 3884 deletions(-) delete mode 100644 examples/CMakeLists.txt create mode 100644 examples/async_basic_pub-sub.cpp delete mode 100644 examples/async_pubsub.cpp delete mode 100644 examples/async_pubsub2.cpp delete mode 100644 examples/async_set_get.cpp delete mode 100644 examples/async_set_get2.cpp delete mode 100644 examples/async_timeout.cpp delete mode 100644 examples/benchmark.cpp create mode 100644 examples/sync_basic_get-set.cpp delete mode 100644 examples/sync_benchmark.cpp delete mode 100644 examples/sync_pipeline.cpp delete mode 100644 examples/sync_set_get.cpp delete mode 100644 examples/sync_timeout.cpp create mode 100644 include/redis/async_client.hpp create mode 100644 include/redis/base_client.hpp create mode 100644 include/redis/buffer.hpp create mode 100644 include/redis/config.hpp create mode 100644 include/redis/parser.hpp create mode 100644 include/redis/sync_client.hpp create mode 100644 include/redis/value.hpp create mode 100644 include/redis/version.hpp delete mode 100644 src/CMakeLists.txt create mode 100644 src/async_client.cpp create mode 100644 src/base_client.cpp create mode 100644 src/parser.cpp delete mode 100644 src/redisclient/CMakeLists.txt delete mode 100644 src/redisclient/config.h delete mode 100644 src/redisclient/impl/pipeline.cpp delete mode 100644 src/redisclient/impl/redisasyncclient.cpp delete mode 100644 src/redisclient/impl/redisclientimpl.cpp delete mode 100644 src/redisclient/impl/redisclientimpl.h delete mode 100644 src/redisclient/impl/redisparser.cpp delete mode 100644 src/redisclient/impl/redissyncclient.cpp delete mode 100644 src/redisclient/impl/redisvalue.cpp delete mode 100644 src/redisclient/impl/throwerror.h delete mode 100644 src/redisclient/pipeline.h delete mode 100644 src/redisclient/redisasyncclient.h delete mode 100644 src/redisclient/redisbuffer.h delete mode 100644 src/redisclient/redisparser.h delete mode 100644 src/redisclient/redissyncclient.h delete mode 100644 src/redisclient/redisvalue.h delete mode 100644 src/redisclient/version.h create mode 100644 src/sync_client.cpp create mode 100644 src/value.cpp diff --git a/.gitignore b/.gitignore index ed94cbc..6e26ae5 100644 --- a/.gitignore +++ b/.gitignore @@ -1,5 +1,6 @@ -# vim files -*.swp -tags -build/* +*.vcxproj + +*.filters + +*.user diff --git a/LICENSE b/LICENSE index 85c98d8..00c5664 100644 --- a/LICENSE +++ b/LICENSE @@ -1,6 +1,7 @@ The MIT License (MIT) Copyright (c) 2014 Alex Nekipelov +Copyright (c) 2019 Nick P (Kurieita) Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the "Software"), to deal in diff --git a/README.md b/README.md index f994c44..ea0a2e4 100644 --- a/README.md +++ b/README.md @@ -1,129 +1,62 @@ -redisclient -=========== +A header-only redis client library based on redisclient (https://github.com/nekipelov/redisclient/) that was modified to remove boost.asio as a dependency. Simple but powerfull. +Currently the only dependency is asio but there are future plans to remove networking as a dependency. -Build master status: [![Build travis status](https://travis-ci.org/nekipelov/redisclient.svg?branch=master)](https://travis-ci.org/nekipelov/redisclient) -[![Build appveyor status](https://ci.appveyor.com/api/projects/status/github/nekipelov/redisclient?branch=master)](https://ci.appveyor.com/project/nekipelov/redisclient/branch/master) - -Build develop status: [![Build travis status](https://travis-ci.org/nekipelov/redisclient.svg?branch=develop)](https://travis-ci.org/nekipelov/redisclient) -[![Build appveyor status](https://ci.appveyor.com/api/projects/status/github/nekipelov/redisclient?branch=develop)](https://ci.appveyor.com/project/nekipelov/redisclient/branch/develop) - -Current version: 1.0.2. - -Boost.asio based Redis-client header-only library. Simple but powerfull. +Also you can build the library like a shared library. Just use -This version requires ะก++11 compiler. If you want to use this library with old compiler, use version 0.4: https://github.com/nekipelov/redisclient/tree/v0.4. +-DREDIS_CLIENT_DYNLIB and -DREDIS_CLIENT_BUILD to build the library and +-DREDIS_CLIENT_DYNLIB to build your project. -Have amalgamated sources. See files in the `amalgamated` directory. -Get/set example: +Synchronous get/set example: ```cpp #include #include #include -#include -#include -#include - -int main(int, char **) -{ - boost::asio::ip::address address = boost::asio::ip::address::from_string("127.0.0.1"); - const unsigned short port = 6379; - boost::asio::ip::tcp::endpoint endpoint(address, port); - - boost::asio::io_service ioService; - redisclient::RedisSyncClient redis(ioService); - boost::system::error_code ec; - - redis.connect(endpoint, ec); - - if(ec) - { - std::cerr << "Can't connect to redis: " << ec.message() << std::endl; - return EXIT_FAILURE; - } - - redisclient::RedisValue result; - - result = redis.command("SET", {"key", "value"}); - - if( result.isError() ) - { - std::cerr << "SET error: " << result.toString() << "\n"; - return EXIT_FAILURE; - } - - result = redis.command("GET", {"key"}); - - if( result.isOk() ) - { - std::cout << "GET: " << result.toString() << "\n"; - return EXIT_SUCCESS; - } - else - { - std::cerr << "GET error: " << result.toString() << "\n"; - return EXIT_FAILURE; - } -} -``` -Async get/set example: +#include +#include -```cpp -#include -#include -#include -#include -#include - -#include - -static const std::string redisKey = "unique-redis-key-example"; -static const std::string redisValue = "unique-redis-value"; - -void handleConnected(boost::asio::io_service &ioService, redisclient::RedisAsyncClient &redis, - boost::system::error_code ec) -{ - if( !ec ) - { - redis.command("SET", {redisKey, redisValue}, [&](const redisclient::RedisValue &v) { - std::cerr << "SET: " << v.toString() << std::endl; - - redis.command("GET", {redisKey}, [&](const redisclient::RedisValue &v) { - std::cerr << "GET: " << v.toString() << std::endl; - - redis.command("DEL", {redisKey}, [&](const redisclient::RedisValue &) { - ioService.stop(); - }); - }); - }); - } - else - { - std::cerr << "Can't connect to redis: " << ec.message() << std::endl; - } -} +#include "redis/sync_client.h" +#include "redis/async_client.h" -int main(int, char **) -{ - boost::asio::ip::address address = boost::asio::ip::address::from_string("127.0.0.1"); - const unsigned short port = 6379; - boost::asio::ip::tcp::endpoint endpoint(address, port); +int main(int, char**) { + asio::ip::address address = asio::ip::address::from_string("127.0.0.1"); + const unsigned short port = 6379; - boost::asio::io_service ioService; - redisclient::RedisAsyncClient redis(ioService); + asio::ip::tcp::endpoint endpoint(address, port); - redis.connect(endpoint, - std::bind(&handleConnected, std::ref(ioService), std::ref(redis), - std::placeholders::_1)); + asio::io_service io; + redis::sync_client client(io); - ioService.run(); + asio::error_code ec; + std::string msg = "something"; - return 0; -} -``` + client.connect(endpoint, msg); -Also you can build the library like a shared library. Just use - -DREDIS_CLIENT_DYNLIB and -DREDIS_CLIENT_BUILD to build redisclient -and -DREDIS_CLIENT_DYNLIB to build your project. + if (ec) { + std::cerr << "Can't connect to redis: " << ec.message() << std::endl; + return EXIT_FAILURE; + } + + redis::value result; + result = client.command("SET", { "key", "value" }); + + if (result.isError()) { + std::cerr << "SET error: " << result.toString() << "\n"; + std::cin.get(); + return EXIT_FAILURE; + } + + result = client.command("GET", { "key" }); + + if (result.isOk()) { + std::cout << "GET: " << result.toString() << "\n"; + std::cin.get(); + return EXIT_SUCCESS; + } else { + std::cerr << "GET error: " << result.toString() << "\n"; + std::cin.get(); + return EXIT_FAILURE; + } +} diff --git a/examples/CMakeLists.txt b/examples/CMakeLists.txt deleted file mode 100644 index e6b084b..0000000 --- a/examples/CMakeLists.txt +++ /dev/null @@ -1,30 +0,0 @@ -set(EXAMPLES - async_pubsub2.cpp - async_pubsub.cpp - async_set_get2.cpp - async_set_get.cpp - async_timeout.cpp - sync_pipeline.cpp - sync_set_get.cpp - benchmark.cpp - sync_benchmark.cpp - sync_timeout.cpp -) - -foreach(EXAMPLE ${EXAMPLES}) - get_filename_component(EXECUTABLE ${EXAMPLE} NAME_WE) - add_executable(${EXECUTABLE} ${EXAMPLE}) - target_link_libraries(${EXECUTABLE} - RedisClient - ${Boost_DATE_TIME_LIBRARY}) -endforeach() - -target_link_libraries(benchmark - RedisClient - ${Boost_PROGRAM_OPTIONS_LIBRARY} -) - -target_link_libraries(sync_benchmark - RedisClient - ${Boost_PROGRAM_OPTIONS_LIBRARY} -) diff --git a/examples/async_basic_pub-sub.cpp b/examples/async_basic_pub-sub.cpp new file mode 100644 index 0000000..9b579ea --- /dev/null +++ b/examples/async_basic_pub-sub.cpp @@ -0,0 +1,52 @@ +#include + +#include + +static const std::string channelName = "unique-redis-channel-name-example"; + +using namespace redis; + +void subscribeHandler(asio::io_service& ioService, const std::vector& buf) { + std::string msg(buf.begin(), buf.end()); + std::cout << msg << std::endl; + + if (msg == "stop") ioService.stop(); +} + +void publishHandler(async_client& publisher, const value&) { + publisher.publish(channelName, "First hello", [&](const value&) { + publisher.publish(channelName, "Last hello", [&](const value&) { + publisher.publish(channelName, "stop"); + }); + }); +} + +int main(int, char**) { + asio::ip::address address = asio::ip::address::from_string("127.0.0.1"); + const unsigned short port = 6379; + + asio::io_service ioService; + async_client publisher(ioService); + async_client subscriber(ioService); + + publisher.connect(address, port, [&](bool status, const std::string& err) { + if (!status) { + std::cerr << "Can't connect to to redis" << err << std::endl; + std::cin.get(); + } else { + subscriber.connect(address, port, [&](bool status, const std::string& err) { + if (!status) { + + std::cerr << "Can't connect to to redis" << err << std::endl; + std::cin.get(); + } else { + subscriber.subscribe(channelName, std::bind(&subscribeHandler, std::ref(ioService), std::placeholders::_1), std::bind(&publishHandler, std::ref(publisher), std::placeholders::_1)); + } + }); + } + }); + + ioService.run(); + + return 0; +} \ No newline at end of file diff --git a/examples/async_pubsub.cpp b/examples/async_pubsub.cpp deleted file mode 100644 index c4d5362..0000000 --- a/examples/async_pubsub.cpp +++ /dev/null @@ -1,177 +0,0 @@ -#include -#include -#include -#include -#include - -#include - -using namespace redisclient; - -static const std::string channelName = "unique-redis-channel-name-example"; -static const boost::posix_time::seconds timeout(1); - -class Client -{ -public: - Client(boost::asio::io_service &ioService, - const boost::asio::ip::address &address, - unsigned short port) - : ioService(ioService), publishTimer(ioService), - connectSubscriberTimer(ioService), connectPublisherTimer(ioService), - address(address), port(port), - publisher(ioService), subscriber(ioService) - { - publisher.installErrorHandler(std::bind(&Client::connectPublisher, this)); - subscriber.installErrorHandler(std::bind(&Client::connectSubscriber, this)); - } - - void publish(const std::string &str) - { - publisher.publish(channelName, str); - } - - void start() - { - connectPublisher(); - connectSubscriber(); - } - -protected: - void errorPubProxy(const std::string &) - { - publishTimer.cancel(); - connectPublisher(); - } - - void errorSubProxy(const std::string &) - { - connectSubscriber(); - } - - void connectPublisher() - { - std::cerr << "connectPublisher\n"; - - if( publisher.state() == RedisAsyncClient::State::Connected ) - { - std::cerr << "disconnectPublisher\n"; - - publisher.disconnect(); - publishTimer.cancel(); - } - - boost::asio::ip::tcp::endpoint endpoint(address, port); - publisher.connect(endpoint, - std::bind(&Client::onPublisherConnected, this, std::placeholders::_1)); - } - - void connectSubscriber() - { - std::cerr << "connectSubscriber\n"; - - if( subscriber.state() == RedisAsyncClient::State::Connected || - subscriber.state() == RedisAsyncClient::State::Subscribed ) - { - std::cerr << "disconnectSubscriber\n"; - subscriber.disconnect(); - } - - boost::asio::ip::tcp::endpoint endpoint(address, port); - subscriber.connect(endpoint, - std::bind(&Client::onSubscriberConnected, this, std::placeholders::_1)); - } - - void callLater(boost::asio::deadline_timer &timer, - void(Client::*callback)()) - { - std::cerr << "callLater\n"; - timer.expires_from_now(timeout); - timer.async_wait([callback, this](const boost::system::error_code &ec) { - if( !ec ) - { - (this->*callback)(); - } - }); - } - - void onPublishTimeout() - { - static size_t counter = 0; - std::string msg = str(boost::format("message %1%") % counter++); - - if( publisher.state() == RedisAsyncClient::State::Connected ) - { - std::cerr << "pub " << msg << "\n"; - publish(msg); - } - - callLater(publishTimer, &Client::onPublishTimeout); - } - - void onPublisherConnected(boost::system::error_code ec) - { - if(ec) - { - std::cerr << "onPublisherConnected: can't connect to redis: " << ec.message() << "\n"; - callLater(connectPublisherTimer, &Client::connectPublisher); - } - else - { - std::cerr << "onPublisherConnected ok\n"; - - callLater(publishTimer, &Client::onPublishTimeout); - } - } - - void onSubscriberConnected(boost::system::error_code ec) - { - if( ec ) - { - std::cerr << "onSubscriberConnected: can't connect to redis: " << ec.message() << "\n"; - callLater(connectSubscriberTimer, &Client::connectSubscriber); - } - else - { - std::cerr << "onSubscriberConnected ok\n"; - subscriber.subscribe(channelName, - std::bind(&Client::onMessage, this, std::placeholders::_1)); - subscriber.psubscribe("*", - std::bind(&Client::onMessage, this, std::placeholders::_1)); - } - } - - void onMessage(const std::vector &buf) - { - std::string s(buf.begin(), buf.end()); - std::cout << "onMessage: " << s << "\n"; - } - -private: - boost::asio::io_service &ioService; - boost::asio::deadline_timer publishTimer; - boost::asio::deadline_timer connectSubscriberTimer; - boost::asio::deadline_timer connectPublisherTimer; - const boost::asio::ip::address address; - const unsigned short port; - - RedisAsyncClient publisher; - RedisAsyncClient subscriber; -}; - -int main(int, char **) -{ - boost::asio::ip::address address = boost::asio::ip::address::from_string("127.0.0.1"); - const unsigned short port = 6379; - - boost::asio::io_service ioService; - - Client client(ioService, address, port); - - client.start(); - ioService.run(); - - std::cerr << "done\n"; - - return 0; -} diff --git a/examples/async_pubsub2.cpp b/examples/async_pubsub2.cpp deleted file mode 100644 index 0346633..0000000 --- a/examples/async_pubsub2.cpp +++ /dev/null @@ -1,79 +0,0 @@ -#include -#include -#include -#include - -#include - -static const std::string channelName = "unique-redis-channel-name-example"; - - -class Client -{ -public: - Client(boost::asio::io_service &ioService) - : ioService(ioService) - {} - - void onMessage(const std::vector &buf) - { - std::string msg(buf.begin(), buf.end()); - - std::cerr << "Message: " << msg << std::endl; - - if( msg == "stop" ) - ioService.stop(); - } - -private: - boost::asio::io_service &ioService; -}; - -void publishHandler(redisclient::RedisAsyncClient &publisher, const redisclient::RedisValue &) -{ - publisher.publish(channelName, "First hello", [&](const redisclient::RedisValue &) { - publisher.publish(channelName, "Last hello", [&](const redisclient::RedisValue &) { - publisher.publish(channelName, "stop"); - }); - }); -} - -int main(int, char **) -{ - boost::asio::ip::address address = boost::asio::ip::address::from_string("127.0.0.1"); - const unsigned short port = 6379; - boost::asio::ip::tcp::endpoint endpoint(address, port); - - boost::asio::io_service ioService; - redisclient::RedisAsyncClient publisher(ioService); - redisclient::RedisAsyncClient subscriber(ioService); - Client client(ioService); - - publisher.connect(endpoint, [&](boost::system::error_code ec) - { - if(ec) - { - std::cerr << "Can't connect to redis: " << ec.message() << std::endl; - } - else - { - subscriber.connect(endpoint, [&](boost::system::error_code ec) - { - if( ec ) - { - std::cerr << "Can't connect to redis: " << ec.message() << std::endl; - } - else - { - subscriber.subscribe(channelName, - std::bind(&Client::onMessage, &client, std::placeholders::_1), - std::bind(&publishHandler, std::ref(publisher), std::placeholders::_1)); - } - }); - } - }); - - ioService.run(); - - return 0; -} diff --git a/examples/async_set_get.cpp b/examples/async_set_get.cpp deleted file mode 100644 index ba62e70..0000000 --- a/examples/async_set_get.cpp +++ /dev/null @@ -1,84 +0,0 @@ -#include -#include -#include -#include - -#include - -static const std::string redisKey = "unique-redis-key-example"; -static const std::string redisValue = "unique-redis-value"; - -class Worker -{ -public: - Worker(boost::asio::io_service &ioService, redisclient::RedisAsyncClient &redisClient) - : ioService(ioService), redisClient(redisClient) - {} - - void onConnect(boost::system::error_code ec); - void onSet(const redisclient::RedisValue &value); - void onGet(const redisclient::RedisValue &value); - void stop(); - -private: - boost::asio::io_service &ioService; - redisclient::RedisAsyncClient &redisClient; -}; - -void Worker::onConnect(boost::system::error_code ec) -{ - if(ec) - { - std::cerr << "Can't connect to redis: " << ec.message() << "\n"; - } - else - { - redisClient.command("SET", {redisKey, redisValue}, - std::bind(&Worker::onSet, this, std::placeholders::_1)); - } -} - -void Worker::onSet(const redisclient::RedisValue &value) -{ - std::cerr << "SET: " << value.toString() << std::endl; - if( value.toString() == "OK" ) - { - redisClient.command("GET", {redisKey}, - std::bind(&Worker::onGet, this, std::placeholders::_1)); - } - else - { - std::cerr << "Invalid value from redis: " << value.toString() << std::endl; - } -} - -void Worker::onGet(const redisclient::RedisValue &value) -{ - std::cerr << "GET " << value.toString() << std::endl; - if( value.toString() != redisValue ) - { - std::cerr << "Invalid value from redis: " << value.toString() << std::endl; - } - - redisClient.command("DEL", {redisKey}, - std::bind(&boost::asio::io_service::stop, std::ref(ioService))); -} - - -int main(int, char **) -{ - boost::asio::ip::address address = boost::asio::ip::address::from_string("127.0.0.1"); - const int port = 6379; - boost::asio::ip::tcp::endpoint endpoint(address, port); - - boost::asio::io_service ioService; - redisclient::RedisAsyncClient client(ioService); - Worker worker(ioService, client); - - client.connect(endpoint, std::bind(&Worker::onConnect, &worker, - std::placeholders::_1)); - - ioService.run(); - - return 0; -} diff --git a/examples/async_set_get2.cpp b/examples/async_set_get2.cpp deleted file mode 100644 index 34f50b0..0000000 --- a/examples/async_set_get2.cpp +++ /dev/null @@ -1,51 +0,0 @@ -#include -#include -#include -#include -#include - -#include - -static const std::string redisKey = "unique-redis-key-example"; -static const std::string redisValue = "unique-redis-value"; - -void handleConnected(boost::asio::io_service &ioService, redisclient::RedisAsyncClient &redis, - boost::system::error_code ec) -{ - if( !ec ) - { - redis.command("SET", {redisKey, redisValue}, [&](const redisclient::RedisValue &v) { - std::cerr << "SET: " << v.toString() << std::endl; - - redis.command("GET", {redisKey}, [&](const redisclient::RedisValue &v) { - std::cerr << "GET: " << v.toString() << std::endl; - - redis.command("DEL", {redisKey}, [&](const redisclient::RedisValue &) { - ioService.stop(); - }); - }); - }); - } - else - { - std::cerr << "Can't connect to redis: " << ec.message() << std::endl; - } -} - -int main(int, char **) -{ - boost::asio::ip::address address = boost::asio::ip::address::from_string("127.0.0.1"); - const unsigned short port = 6379; - boost::asio::ip::tcp::endpoint endpoint(address, port); - - boost::asio::io_service ioService; - redisclient::RedisAsyncClient redis(ioService); - - redis.connect(endpoint, - std::bind(&handleConnected, std::ref(ioService), std::ref(redis), - std::placeholders::_1)); - - ioService.run(); - - return 0; -} diff --git a/examples/async_timeout.cpp b/examples/async_timeout.cpp deleted file mode 100644 index 859113d..0000000 --- a/examples/async_timeout.cpp +++ /dev/null @@ -1,110 +0,0 @@ -#include -#include -#include -#include -#include - -#include - -static const std::string redisKey = "unique-redis-key-example"; - -class Worker -{ -public: - Worker(boost::asio::io_service &ioService) - : ioService(ioService), redisClient(ioService), timer(ioService), - timeout(boost::posix_time::seconds(3)) - { - } - - void run(); - void stop(); - -protected: - void onConnect(boost::system::error_code ec); - void onGet(const redisclient::RedisValue &value); - void get(); - - void onTimeout(const boost::system::error_code &ec); - -private: - boost::asio::io_service &ioService; - redisclient::RedisAsyncClient redisClient; - boost::asio::deadline_timer timer; - boost::posix_time::time_duration timeout; -}; - -void Worker::run() -{ - const char *address = "127.0.0.1"; - const int port = 6379; - - boost::asio::ip::tcp::endpoint endpoint(boost::asio::ip::address::from_string(address), port); - - timer.expires_from_now(timeout); - timer.async_wait(std::bind(&Worker::onTimeout, this, std::placeholders::_1)); - - redisClient.connect(endpoint, std::bind(&Worker::onConnect, this, - std::placeholders::_1)); -} - -void Worker::onConnect(boost::system::error_code ec) -{ - if (ec) - { - if (ec != boost::asio::error::operation_aborted) - { - timer.cancel(); - std::cerr << "Can't connect to redis: " << ec.message() << "\n"; - } - } - else - { - std::cerr << "connected\n"; - get(); - } -} - -void Worker::onGet(const redisclient::RedisValue &value) -{ - timer.cancel(); - std::cerr << "GET " << value.toString() << std::endl; - sleep(1); - - get(); -} - -void Worker::get() -{ - timer.expires_from_now(timeout); - timer.async_wait(std::bind(&Worker::onTimeout, this, std::placeholders::_1)); - - redisClient.command("GET", {redisKey}, - std::bind(&Worker::onGet, this, std::placeholders::_1)); -} - - -void Worker::onTimeout(const boost::system::error_code &ec) -{ - if (!ec) - { - std::cerr << "timeout!\n"; - boost::system::error_code ignore_ec; - - redisClient.disconnect(); - // try again - run(); - } -} - -int main(int, char **) -{ - boost::asio::io_service ioService; - Worker worker(ioService); - - worker.run(); - - ioService.run(); - - return 0; -} diff --git a/examples/benchmark.cpp b/examples/benchmark.cpp deleted file mode 100644 index 675e44c..0000000 --- a/examples/benchmark.cpp +++ /dev/null @@ -1,245 +0,0 @@ -#include -#include -#include -#include - -#include -#include -#include -#include -#include - -#include - - -struct Config -{ - std::string address; - uint16_t port; - bool connectionPerWorker; - size_t workersCount; -}; - -// Fetch operations-per-second value from redis-server -class OpsFetcher : public std::enable_shared_from_this { -public: - OpsFetcher(boost::asio::io_service &ioService, - std::shared_ptr redisClient) - : ioService(ioService), timer(ioService), redisClient(redisClient), - timeout(boost::posix_time::seconds(1)) - { - } - - void run() - { - resetTimeout(); - } - -protected: - void onTimeout(const boost::system::error_code &errorCode) - { - if( errorCode != boost::asio::error::operation_aborted ) - { - resetTimeout(); - redisClient->command("info", {"stats"}, - std::bind(&OpsFetcher::printOPS, shared_from_this(), std::placeholders::_1)); - } - } - - void resetTimeout() - { - timer.expires_from_now(timeout); - timer.async_wait(std::bind(&OpsFetcher::onTimeout, shared_from_this(), std::placeholders::_1)); - } - - void printOPS(const redisclient::RedisValue &stats) - { - static const std::string key("instantaneous_ops_per_sec"); - static const std::regex expression("instantaneous_ops_per_sec:(\\d+)"); - - std::smatch matches; - std::string s = stats.toString(); - - if( std::regex_search(s, matches, expression) ) { - unsigned long ops = std::stoul(matches[1]); - - std::cout << "redis server ops: " << ops << std::endl; - } - } - -private: - boost::asio::io_service &ioService; - boost::asio::deadline_timer timer; - std::shared_ptr redisClient; - const boost::posix_time::time_duration timeout; - -}; - -class Worker : public std::enable_shared_from_this -{ -public: - Worker(boost::asio::io_service &ioService, - std::shared_ptr redisClient) - : ioService(ioService), redisClient(redisClient) - { - } - - ~Worker() - { - } - - void run() - { - work(); - } - - void work() - { - static int i = 0; - std::string key = str(boost::format("key %1%") % ++i); - auto self = shared_from_this(); - - redisClient->command("SET", {key, key}, [key, self](const redisclient::RedisValue &) { - self->redisClient->command("GET", {key}, [key, self](const redisclient::RedisValue &result) { - assert(result.toString() == key); - (void)result; // fix unused warning - - self->redisClient->command("DEL", {key}, [key, self](const redisclient::RedisValue &) { - self->work(); - }); - }); - }); - } - -private: - boost::asio::io_service &ioService; - std::shared_ptr redisClient; -}; - -class Benchmark -{ -public: - Benchmark(boost::asio::io_service &ioService, const Config &config) - : ioService(ioService), config(config) - { - } - - void run() - { - boost::asio::ip::tcp::endpoint endpoint( - boost::asio::ip::address::from_string(config.address), config.port); - - if( config.connectionPerWorker ) - { - for(size_t i = 0; i < config.workersCount; ++i) - { - auto client = std::make_shared(ioService); - auto worker = std::make_shared(ioService, client); - std::function callback = std::bind(&Benchmark::runWorker, this, worker); - - client->connect(endpoint, - std::bind(&Benchmark::onConnect, this, - std::placeholders::_1, callback)); - } - { - auto client = std::make_shared(ioService); - auto rpcFetcher = std::make_shared(ioService, client); - std::function callback = std::bind(&Benchmark::runRpcFetcher, this, rpcFetcher); - - client->connect(endpoint, std::bind(&Benchmark::onConnect, this, - std::placeholders::_1, callback)); - } - } - else - { - auto client = std::make_shared(ioService); - - client->connect(endpoint, std::bind(&Benchmark::onConnect, this, - std::placeholders::_1, [=]() { - for(size_t i = 0; i < config.workersCount; ++i) - { - auto worker = std::make_shared(ioService, client); - runWorker(worker); - } - - auto rpcFetcher = std::make_shared(ioService, client); - runRpcFetcher(rpcFetcher); - })); - } - } - -private: - void runWorker(std::shared_ptr worker) - { - worker->run(); - } - - void runRpcFetcher(std::shared_ptr rpcFetcher) - { - rpcFetcher->run(); - } - - void onConnect(boost::system::error_code ec, std::function callback) - { - if(ec) - { - std::cerr << "Can't connect to redis " << config.address << ":" << config.port - << ": " << ec.message() << "\n"; - ioService.stop(); - } - else - { - callback(); - } - } - -private: - boost::asio::io_service &ioService; - Config config; -}; - -int main(int argc, char **argv) -{ - namespace po = boost::program_options; - - Config config; - - po::options_description description("Options"); - description.add_options() - ("help", "produce help message") - ("address", po::value(&config.address)->default_value("127.0.0.1"), - "redis server ip address") - ("port", po::value(&config.port)->default_value(6379), "redis server port") - ("workers", po::value(&config.workersCount)->default_value(50), "count of workers") - ("connection-per-worker", po::value(&config.connectionPerWorker)->default_value(false), - "if true, create connection per worker") - ; - - po::variables_map vm; - - try - { - po::store(po::parse_command_line(argc, argv, description), vm); - po::notify(vm); - } - catch(const po::error &e) - { - std::cerr << e.what() << "\n"; - return EXIT_FAILURE; - } - - if( vm.count("help") ) - { - std::cout << description << "\n"; - return EXIT_SUCCESS; - } - - boost::asio::io_service ioService; - Benchmark benchmark(ioService, config); - - benchmark.run(); - ioService.run(); - - return 0; -} - diff --git a/examples/sync_basic_get-set.cpp b/examples/sync_basic_get-set.cpp new file mode 100644 index 0000000..705102a --- /dev/null +++ b/examples/sync_basic_get-set.cpp @@ -0,0 +1,54 @@ +#include + +#include + +static const std::string channelName = "unique-redis-channel-name-example"; + +using namespace redis; + +void subscribeHandler(asio::io_service& ioService, const std::vector& buf) { + std::string msg(buf.begin(), buf.end()); + std::cout << msg << std::endl; + + if (msg == "stop") ioService.stop(); +} + +void publishHandler(async_client& publisher, const value&) { + publisher.publish(channelName, "First hello", [&](const value&) { + publisher.publish(channelName, "Last hello", [&](const value&) { + publisher.publish(channelName, "stop"); + }); + }); +} + +int main(int, char**) { + asio::ip::address address = asio::ip::address::from_string("127.0.0.1"); + const unsigned short port = 6379; + + asio::io_service ioService; + async_client publisher(ioService); + async_client subscriber(ioService); + + publisher.connect(address, port, [&](bool status, const std::string& err) { + if (!status) { + std::cerr << "Can't connect to to redis" << err << std::endl; + std::cin.get(); + } + else { + subscriber.connect(address, port, [&](bool status, const std::string& err) { + if (!status) { + + std::cerr << "Can't connect to to redis" << err << std::endl; + std::cin.get(); + } + else { + subscriber.subscribe(channelName, std::bind(&subscribeHandler, std::ref(ioService), std::placeholders::_1), std::bind(&publishHandler, std::ref(publisher), std::placeholders::_1)); + } + }); + } + }); + + ioService.run(); + + return 0; +} \ No newline at end of file diff --git a/examples/sync_benchmark.cpp b/examples/sync_benchmark.cpp deleted file mode 100644 index 35ca3f4..0000000 --- a/examples/sync_benchmark.cpp +++ /dev/null @@ -1,133 +0,0 @@ -#include -#include -#include -#include - -#include -#include -#include -#include - -#include -#include - - -struct Config -{ - std::string address; - uint16_t port; -}; - -class Benchmark -{ -public: - Benchmark(boost::asio::io_service &ioService, const Config &config) - : ioService(ioService), config(config) - { - } - - void run() - { - boost::asio::ip::tcp::endpoint endpoint( - boost::asio::ip::address::from_string(config.address), config.port); - - - redisclient::RedisSyncClient redisClient(ioService); - boost::system::error_code ec; - - redisClient.connect(endpoint, ec); - - if (ec) - { - std::cerr << "Can't connect to " << config.address << ":" << config.port - << ": " << ec.message() << "\n"; - exit(-1); - } - size_t i = 0; - - for(;;) - { - std::string key = str(boost::format("key %1%") % ++i); - - if (redisClient.command("SET", {key, key}) != "OK") - abort(); - if (redisClient.command("GET", {key}) != key) - abort(); - if (redisClient.command("DEL", {key}) != 1) - abort(); - - printOPS(redisClient); - } - } - -private: - void printOPS(redisclient::RedisSyncClient &redisClient) - { - static time_t lastTime = time(0); - const time_t timeout = 1; // 1 sec; - - if (time(0) >= lastTime + timeout) - { - redisclient::RedisValue stats = redisClient.command("info", {"stats"}); - lastTime = time(0); - - static const std::string key("instantaneous_ops_per_sec"); - static const std::regex expression("instantaneous_ops_per_sec:(\\d+)"); - - std::smatch matches; - std::string s = stats.toString(); - - if( std::regex_search(s, matches, expression) ) { - unsigned long ops = std::stoul(matches[1]); - - std::cout << "redis server ops: " << ops << std::endl; - } - } - } - -private: - boost::asio::io_service &ioService; - Config config; -}; - -int main(int argc, char **argv) -{ - namespace po = boost::program_options; - - Config config; - - po::options_description description("Options"); - description.add_options() - ("help", "produce help message") - ("address", po::value(&config.address)->default_value("127.0.0.1"), - "redis server ip address") - ("port", po::value(&config.port)->default_value(6379), "redis server port") - ; - - po::variables_map vm; - - try - { - po::store(po::parse_command_line(argc, argv, description), vm); - po::notify(vm); - } - catch(const po::error &e) - { - std::cerr << e.what() << "\n"; - return EXIT_FAILURE; - } - - if( vm.count("help") ) - { - std::cout << description << "\n"; - return EXIT_SUCCESS; - } - - boost::asio::io_service ioService; - Benchmark benchmark(ioService, config); - - benchmark.run(); - - return 0; -} - diff --git a/examples/sync_pipeline.cpp b/examples/sync_pipeline.cpp deleted file mode 100644 index f2626dd..0000000 --- a/examples/sync_pipeline.cpp +++ /dev/null @@ -1,50 +0,0 @@ -#include -#include -#include -#include -#include -#include -#include - -int main(int, char **) -{ - boost::asio::ip::address address = boost::asio::ip::address::from_string("127.0.0.1"); - const unsigned short port = 6379; - boost::asio::ip::tcp::endpoint endpoint(address, port); - - boost::asio::io_service ioService; - redisclient::RedisSyncClient redis(ioService); - boost::system::error_code ec; - - redis.connect(endpoint, ec); - - if (ec) - { - std::cerr << "Can't connect to redis: " << ec.message() << std::endl; - return EXIT_FAILURE; - } - - redisclient::Pipeline pipeline = redis.pipelined().command("SET", {"key", "value"}) - .command("INCR", {"counter"}) - .command("INCR", {"counter"}) - .command("DECR", {"counter"}) - .command("GET", {"counter"}) - .command("DEL", {"counter"}) - .command("DEL", {"key"}); - redisclient::RedisValue result = pipeline.finish(); - - if (result.isOk()) - { - for(const auto &i: result.toArray()) - { - std::cout << "Result: " << i.inspect() << "\n"; - } - - return EXIT_SUCCESS; - } - else - { - std::cerr << "GET error: " << result.toString() << "\n"; - return EXIT_FAILURE; - } -} diff --git a/examples/sync_set_get.cpp b/examples/sync_set_get.cpp deleted file mode 100644 index 8dbb0f9..0000000 --- a/examples/sync_set_get.cpp +++ /dev/null @@ -1,48 +0,0 @@ -#include -#include -#include -#include -#include -#include - -int main(int, char **) -{ - boost::asio::ip::address address = boost::asio::ip::address::from_string("127.0.0.1"); - const unsigned short port = 6379; - boost::asio::ip::tcp::endpoint endpoint(address, port); - - boost::asio::io_service ioService; - redisclient::RedisSyncClient redis(ioService); - boost::system::error_code ec; - - redis.connect(endpoint, ec); - - if(ec) - { - std::cerr << "Can't connect to redis: " << ec.message() << std::endl; - return EXIT_FAILURE; - } - - redisclient::RedisValue result; - - result = redis.command("SET", {"key", "value"}); - - if( result.isError() ) - { - std::cerr << "SET error: " << result.toString() << "\n"; - return EXIT_FAILURE; - } - - result = redis.command("GET", {"key"}); - - if( result.isOk() ) - { - std::cout << "GET: " << result.toString() << "\n"; - return EXIT_SUCCESS; - } - else - { - std::cerr << "GET error: " << result.toString() << "\n"; - return EXIT_FAILURE; - } -} diff --git a/examples/sync_timeout.cpp b/examples/sync_timeout.cpp deleted file mode 100644 index 6089bfa..0000000 --- a/examples/sync_timeout.cpp +++ /dev/null @@ -1,51 +0,0 @@ -#include -#include -#include -#include -#include -#include - -int main(int, char **) -{ - boost::asio::ip::address address = boost::asio::ip::address::from_string("127.0.0.1"); - const unsigned short port = 6379; - boost::asio::ip::tcp::endpoint endpoint(address, port); - - boost::asio::io_service ioService; - redisclient::RedisSyncClient redisClient(ioService); - boost::system::error_code ec; - - redisClient.setConnectTimeout(boost::posix_time::seconds(3)) - .setCommandTimeout(boost::posix_time::seconds(3)); - redisClient.connect(endpoint, ec); - - if (ec) - { - std::cerr << "Can't connect to redis: " << ec.message() << std::endl; - return EXIT_FAILURE; - } - - while(1) - { - redisclient::RedisValue result = redisClient.command("GET", {"key"}, ec); - - if (ec) - { - std::cerr << "Network error: " << ec.message() << "\n"; - break; - } - else if (result.isError()) - { - std::cerr << "GET error: " << result.toString() << "\n"; - return EXIT_FAILURE; - } - else - { - std::cout << "GET: " << result.toString() << "\n"; - } - - sleep(1); - } - - return EXIT_FAILURE; -} diff --git a/include/redis/async_client.hpp b/include/redis/async_client.hpp new file mode 100644 index 0000000..da70d28 --- /dev/null +++ b/include/redis/async_client.hpp @@ -0,0 +1,99 @@ +#ifndef REDIS_ASYNC_CLIENT_HEADER +#define REDIS_ASYNC_CLIENT_HEADER + +#include + +#include +#include +#include +#include + +#include "redis/base_client.hpp" +#include "redis/value.hpp" +#include "redis/buffer.hpp" +#include "redis/config.hpp" + +namespace redis { + class base_client; + + class async_client { + public: + // Subscribe handle. + struct subscribe_handle { + size_t id; + std::string channel; + }; + + typedef base_client::State State; + + REDIS_CLIENT_DECL async_client(asio::io_context &ioService); + REDIS_CLIENT_DECL ~async_client(); + + // Connect to redis server + REDIS_CLIENT_DECL void connect(const asio::ip::address &address, unsigned short port, std::function handler); + + // Connect to redis server + REDIS_CLIENT_DECL void connect(const asio::ip::tcp::endpoint &endpoint, std::function handler); + + // backward compatibility + inline void asyncConnect(const asio::ip::address &address, unsigned short port, std::function handler) { + connect(address, port, std::move(handler)); + } + + // backward compatibility + inline void asyncConnect(const asio::ip::tcp::endpoint &endpoint, std::function handler) { + connect(endpoint, handler); + } + + // Return true if is connected to redis. + // Deprecated: use state() == RedisAsyncClisend::State::Connected. + REDIS_CLIENT_DECL bool isConnected() const; + + // Return connection state. See base_client::State. + REDIS_CLIENT_DECL State state() const; + + // disconnect from redis and clear command queue + REDIS_CLIENT_DECL void disconnect(); + + // Set custom error handler. + REDIS_CLIENT_DECL void installErrorsubscribe_handler(std::function handler); + + // Execute command on Redis server with the list of arguments. + REDIS_CLIENT_DECL void command(const std::string &cmd, std::deque args, std::function handler = dummysubscribe_handler); + + // Subscribe to channel. subscribe_handler msgsubscribe_handler will be called + // when someone publish message on channel. Call unsubscribe + // to stop the subscription. + REDIS_CLIENT_DECL subscribe_handle subscribe(const std::string &channelName, std::function msg)> msgsubscribe_handler, std::function handler = &dummysubscribe_handler); + + + REDIS_CLIENT_DECL subscribe_handle psubscribe(const std::string &pattern, std::function msg)> msgsubscribe_handler, std::function handler = &dummysubscribe_handler); + + // Unsubscribe + REDIS_CLIENT_DECL void unsubscribe(const subscribe_handle &handle); + REDIS_CLIENT_DECL void punsubscribe(const subscribe_handle &handle); + + // Subscribe to channel. subscribe_handler msgsubscribe_handler will be called + // when someone publish message on channel; it will be + // unsubscribed after call. + REDIS_CLIENT_DECL void singleShotSubscribe(const std::string &channel, std::function msg)> msgsubscribe_handler, std::function handler = &dummysubscribe_handler); + + REDIS_CLIENT_DECL void singleShotPSubscribe(const std::string &channel, std::function msg)> msgsubscribe_handler, std::function handler = &dummysubscribe_handler); + + // Publish message on channel. + REDIS_CLIENT_DECL void publish(const std::string &channel, const buffer &msg, std::function handler = &dummysubscribe_handler); + REDIS_CLIENT_DECL static void dummysubscribe_handler(value) {} + + protected: + REDIS_CLIENT_DECL bool stateValid() const; + + private: + std::shared_ptr pimpl; + }; +} + +#ifdef REDIS_CLIENT_HEADER_ONLY + #include "src/async_client.cpp" +#endif + +#endif diff --git a/include/redis/base_client.hpp b/include/redis/base_client.hpp new file mode 100644 index 0000000..88119d9 --- /dev/null +++ b/include/redis/base_client.hpp @@ -0,0 +1,127 @@ +/* + * Copyright (C) Alex Nekipelov (alex@nekipelov.net) + * License: MIT + */ + +#ifndef REDISCLIENT_REDISCLIENTIMPL_H +#define REDISCLIENT_REDISCLIENTIMPL_H + +#include +#include +#include + +#include +#include +#include +#include +#include +#include + +#include "redis/parser.hpp" +#include "redis/buffer.hpp" +#include "redis/config.hpp" + +namespace redis { + class base_client: public std::enable_shared_from_this { + public: + enum class State { + Unconnected, + Connecting, + Connected, + Subscribed, + Closed + }; + + REDIS_CLIENT_DECL base_client(asio::io_context &ioService); + REDIS_CLIENT_DECL ~base_client(); + + REDIS_CLIENT_DECL void handleAsyncConnect(const asio::error_code &ec, const std::function &handler); + REDIS_CLIENT_DECL size_t subscribe(const std::string &command, const std::string &channel, std::function msg)> msgsubscribe_handler, std::function handler); + REDIS_CLIENT_DECL void singleShotSubscribe(const std::string &command, const std::string &channel, std::function msg)> msgsubscribe_handler, std::function handler); + REDIS_CLIENT_DECL void unsubscribe(const std::string &command, size_t handle_id, const std::string &channel, std::function handler); + REDIS_CLIENT_DECL void close() noexcept; + REDIS_CLIENT_DECL State getState() const; + REDIS_CLIENT_DECL static std::vector makeCommand(const std::deque &items); + REDIS_CLIENT_DECL value doSyncCommand(const std::deque &buff); + REDIS_CLIENT_DECL void doAsyncCommand(std::vector buff, std::function handler); + + REDIS_CLIENT_DECL void sendNextCommand(); + REDIS_CLIENT_DECL void processMessage(); + REDIS_CLIENT_DECL void doProcessMessage(value v); + REDIS_CLIENT_DECL void asyncWrite(const asio::error_code &ec, const size_t); + REDIS_CLIENT_DECL void asyncRead(const asio::error_code &ec, const size_t); + + REDIS_CLIENT_DECL void onRedisError(const value &); + REDIS_CLIENT_DECL static void defaulErrorsubscribe_handler(const std::string &s); + + REDIS_CLIENT_DECL static void append(std::vector &vec, const buffer &buf); + REDIS_CLIENT_DECL static void append(std::vector &vec, const std::string &s); + REDIS_CLIENT_DECL static void append(std::vector &vec, const char *s); + REDIS_CLIENT_DECL static void append(std::vector &vec, char c); + + template + static inline void append(std::vector &vec, const char (&s)[size]); + + template + inline void post(const subscribe_handler &handler); + + asio::io_context::strand strand; + asio::ip::tcp::socket socket; + parser redisParser; + std::array buf; + size_t subscribeSeq; + + typedef std::pair &buf)> > Msgsubscribe_handlerType; + typedef std::function &buf)> SingleShotsubscribe_handlerType; + + typedef std::multimap Msgsubscribe_handlersMap; + typedef std::multimap SingleShotsubscribe_handlersMap; + + std::queue> handlers; + std::deque> dataWrited; + std::deque> dataQueued; + Msgsubscribe_handlersMap msgsubscribe_handlers; + SingleShotsubscribe_handlersMap singleShotMsgsubscribe_handlers; + + std::function errorsubscribe_handler; + State state; + }; + + template + void base_client::append(std::vector &vec, const char (&s)[size]) { + vec.insert(vec.end(), s, s + size); + } + + template + inline void base_client::post(const subscribe_handler &handler) { + asio::post(asio::bind_executor(strand, handler)); + } + + inline std::string to_string(base_client::State state) { + switch(state) { + case base_client::State::Unconnected: + return "Unconnected"; + break; + case base_client::State::Connecting: + return "Connecting"; + break; + case base_client::State::Connected: + return "Connected"; + break; + case base_client::State::Subscribed: + return "Subscribed"; + break; + case base_client::State::Closed: + return "Closed"; + break; + } + + return "Invalid"; + } +} + +#ifdef REDIS_CLIENT_HEADER_ONLY + #include "src/base_client.cpp" +#endif + +#endif diff --git a/include/redis/buffer.hpp b/include/redis/buffer.hpp new file mode 100644 index 0000000..18cfd74 --- /dev/null +++ b/include/redis/buffer.hpp @@ -0,0 +1,53 @@ +#ifndef REDIS_BUFFER_HEADER +#define REDIS_BUFFER_HEADER + +#include +#include +#include +#include + +#include "redis/config.hpp" + +namespace redis { + class buffer { + public: + inline buffer(); + inline buffer(const char *ptr, size_t dataSize); + inline buffer(const char *s); + inline buffer(const std::string &s); + inline buffer(const std::vector &buf); + + inline size_t size() const; + inline const char *data() const; + + private: + const char *ptr_; + size_t size_; + }; + + buffer::buffer() : ptr_(NULL), size_(0) { + } + + buffer::buffer(const char *ptr, size_t dataSize) : ptr_(ptr), size_(dataSize) { + } + + buffer::buffer(const char *s) : ptr_(s), size_(s == NULL ? 0 : strlen(s)) { + } + + buffer::buffer(const std::string &s) : ptr_(s.c_str()), size_(s.length()) { + } + + buffer::buffer(const std::vector &buf) : ptr_(buf.empty() ? NULL : &buf[0]), size_(buf.size()) { + } + + size_t buffer::size() const { + return size_; + } + + const char *buffer::data() const { + return ptr_; + } +} + +#endif + diff --git a/include/redis/config.hpp b/include/redis/config.hpp new file mode 100644 index 0000000..f350368 --- /dev/null +++ b/include/redis/config.hpp @@ -0,0 +1,27 @@ +#ifndef REDIS_CONFIG_HEADER +#define REDIS_CONFIG_HEADER + +// Default to a header-only compilation +#ifndef REDIS_CLIENT_HEADER_ONLY + #ifndef REDIS_CLIENT_SEPARATED_COMPILATION + #define REDIS_CLIENT_HEADER_ONLY + #endif +#endif + +#ifdef REDIS_CLIENT_HEADER_ONLY + #define REDIS_CLIENT_DECL inline +#else + #if defined(WIN32) && defined(REDIS_CLIENT_DYNLIB) + #ifdef REDIS_CLIENT_BUILD + #define REDIS_CLIENT_DECL __declspec(dllexport) + #else + #define REDIS_CLIENT_DECL __declspec(dllimport) + #endif + #endif +#endif + +#ifndef REDIS_CLIENT_DECL + #define REDIS_CLIENT_DECL +#endif + +#endif diff --git a/include/redis/parser.hpp b/include/redis/parser.hpp new file mode 100644 index 0000000..4447465 --- /dev/null +++ b/include/redis/parser.hpp @@ -0,0 +1,82 @@ +#ifndef REDIS_PARSER_HEADER +#define REDIS_PARSER_HEADER + +#include +#include +#include +#include + +#include "redis/value.hpp" +#include "redis/config.hpp" + +namespace redis { + class parser { + public: + REDIS_CLIENT_DECL parser(); + + enum ParseResult { + Completed, + Incompleted, + Error, + }; + + REDIS_CLIENT_DECL std::pair parse(const char *ptr, size_t size); + REDIS_CLIENT_DECL value result(); + + protected: + REDIS_CLIENT_DECL std::pair parseChunk(const char *ptr, size_t size); + REDIS_CLIENT_DECL std::pair parseArray(const char *ptr, size_t size); + + inline bool isChar(int c) { + return c >= 0 && c <= 127; + } + + inline bool isControl(int c) { + return (c >= 0 && c <= 31) || (c == 127); + } + + REDIS_CLIENT_DECL long int bufToLong(const char *str, size_t size); + + private: + enum State { + Start = 0, + + String = 1, + StringLF = 2, + + ErrorString = 3, + ErrorLF = 4, + + Integer = 5, + IntegerLF = 6, + + BulkSize = 7, + BulkSizeLF = 8, + Bulk = 9, + BulkCR = 10, + BulkLF = 11, + + ArraySize = 12, + ArraySizeLF = 13, + + } state; + + long int bulkSize; + std::vector buf; + std::stack arrayStack; + std::stack valueStack; + + static const char stringReply = '+'; + static const char errorReply = '-'; + static const char integerReply = ':'; + static const char bulkReply = '$'; + static const char arrayReply = '*'; + }; + +} + +#ifdef REDIS_CLIENT_HEADER_ONLY + #include "src/parser.cpp" +#endif + +#endif diff --git a/include/redis/sync_client.hpp b/include/redis/sync_client.hpp new file mode 100644 index 0000000..3e9214e --- /dev/null +++ b/include/redis/sync_client.hpp @@ -0,0 +1,55 @@ +#ifndef REDIS_SYNC_CLIENT_HEADER +#define REDIS_SYNC_CLIENT_HEADER + +#include + +#include +#include +#include + +#include "redis/base_client.hpp" +#include "redis/buffer.hpp" +#include "redis/value.hpp" +#include "redis/config.hpp" + +namespace redis { + class base_client; + + class sync_client { + public: + sync_client(const sync_client&) = delete; + typedef base_client::State State; + + REDIS_CLIENT_DECL sync_client(asio::io_context &ioService); + REDIS_CLIENT_DECL ~sync_client(); + + // Connect to redis server + REDIS_CLIENT_DECL bool connect(const asio::ip::tcp::endpoint &endpoint, std::string &errmsg); + + // Connect to redis server + REDIS_CLIENT_DECL bool connect(const asio::ip::address &address, unsigned short port, std::string &errmsg); + + // Set custom error handler. + REDIS_CLIENT_DECL void installErrorsubscribe_handler(std::function handler); + + // Execute command on Redis server with the list of arguments. + REDIS_CLIENT_DECL value command(const std::string &cmd, std::deque args); + + REDIS_CLIENT_DECL value command(const std::string &cmd); + + // Return connection state. See base_client::State. + REDIS_CLIENT_DECL State state() const; + + protected: + REDIS_CLIENT_DECL bool stateValid() const; + + private: + std::shared_ptr pimpl; + }; +} + +#ifdef REDIS_CLIENT_HEADER_ONLY + #include "src/sync_client.cpp" +#endif + +#endif diff --git a/include/redis/value.hpp b/include/redis/value.hpp new file mode 100644 index 0000000..cd1d38f --- /dev/null +++ b/include/redis/value.hpp @@ -0,0 +1,102 @@ +#ifndef REDIS_VALUE_HEADER +#define REDIS_VALUE_HEADER + +#include +#include +#include + +#include "redis/config.hpp" + +namespace redis { + class value { + public: + struct ErrorTag {}; + + REDIS_CLIENT_DECL value(); + REDIS_CLIENT_DECL value(value &&other); + REDIS_CLIENT_DECL value(int64_t i); + REDIS_CLIENT_DECL value(const char *s); + REDIS_CLIENT_DECL value(const std::string &s); + REDIS_CLIENT_DECL value(std::vector buf); + REDIS_CLIENT_DECL value(std::vector buf, struct ErrorTag); + REDIS_CLIENT_DECL value(std::vector array); + + value(const value &) = default; + value& operator = (const value &) = default; + value& operator = (value &&) = default; + + // Return the _value as a std::string if + // type is a byte string; otherwise returns an empty std::string. + REDIS_CLIENT_DECL std::string toString() const; + + // Return the _value as a std::vector if + // type is a byte string; otherwise returns an empty std::vector. + REDIS_CLIENT_DECL std::vector toByteArray() const; + + // Return the _value as a std::vector if + // type is an int; otherwise returns 0. + REDIS_CLIENT_DECL int64_t toInt() const; + + // Return the _value as an array if type is an array; + // otherwise returns an empty array. + REDIS_CLIENT_DECL std::vector toArray() const; + + // Return the string representation of the _value. Use + // for dump content of the _value. + REDIS_CLIENT_DECL std::string inspect() const; + + // Return true if _value not a error + REDIS_CLIENT_DECL bool isOk() const; + // Return true if _value is a error + REDIS_CLIENT_DECL bool isError() const; + + // Return true if this is a null. + REDIS_CLIENT_DECL bool isNull() const; + // Return true if type is an int + REDIS_CLIENT_DECL bool isInt() const; + // Return true if type is an array + REDIS_CLIENT_DECL bool isArray() const; + // Return true if type is a string/byte array. Alias for isString(); + REDIS_CLIENT_DECL bool isByteArray() const; + // Return true if type is a string/byte array. Alias for isByteArray(). + REDIS_CLIENT_DECL bool isString() const; + + REDIS_CLIENT_DECL bool operator == (const value &rhs) const; + REDIS_CLIENT_DECL bool operator != (const value &rhs) const; + + protected: + template + T castTo() const; + + template + bool typeEq() const; + + private: + struct NullTag { + inline bool operator == (const NullTag &) const { + return true; + } + }; + + bool _error; + std::variant, std::vector> _value; + }; + + template + T value::castTo() const { + if (auto pval = std::get_if(&_value)) return *pval; + else return T(); + } + + template + bool value::typeEq() const { + if( std::holds_alternative(_value)) return true; + else return false; + } +} + +#ifdef REDIS_CLIENT_HEADER_ONLY + #include "src/value.cpp" +#endif + +#endif // REDISCLIENT_REDISVALUE_H diff --git a/include/redis/version.hpp b/include/redis/version.hpp new file mode 100644 index 0000000..000ac1d --- /dev/null +++ b/include/redis/version.hpp @@ -0,0 +1,6 @@ +#ifndef REDIS_VERSION_HEADER +#define REDISCLIENT_VERSION_H + +#define REDIS_CLIENT_VERSION 501 + +#endif diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt deleted file mode 100644 index af85b2e..0000000 --- a/src/CMakeLists.txt +++ /dev/null @@ -1 +0,0 @@ -add_subdirectory(redisclient) diff --git a/src/async_client.cpp b/src/async_client.cpp new file mode 100644 index 0000000..af40b4e --- /dev/null +++ b/src/async_client.cpp @@ -0,0 +1,121 @@ +#ifndef REDIS_ASYNC_CLIENT_SOURCE +#define REDIS_ASYNC_CLIENT_SOURCE + +#include +#include + +#include "redis/async_client.hpp" + +namespace redis { + async_client::async_client(asio::io_context &ioService) : pimpl(std::make_shared(ioService)) { + pimpl->errorsubscribe_handler = std::bind(&base_client::defaulErrorsubscribe_handler, std::placeholders::_1); + } + + async_client::~async_client() { + pimpl->close(); + } + + void async_client::connect(const ::asio::ip::address &address, unsigned short port, std::function handler) { + asio::ip::tcp::endpoint endpoint(address, port); + connect(endpoint, std::move(handler)); + } + + void async_client::connect(const asio::ip::tcp::endpoint &endpoint, std::function handler) { + if(pimpl->state == State::Unconnected || pimpl->state == State::Closed) { + pimpl->state = State::Connecting; + pimpl->socket.async_connect(endpoint, std::bind(&base_client::handleAsyncConnect, pimpl, std::placeholders::_1, std::move(handler))); + } else { + std::stringstream ss; + + ss << "async_client::connect called on socket with state " << to_string(pimpl->state); + handler(false, ss.str()); + } + } + + bool async_client::isConnected() const { + return pimpl->getState() == State::Connected || pimpl->getState() == State::Subscribed; + } + + void async_client::disconnect() { + pimpl->close(); + } + + void async_client::installErrorsubscribe_handler(std::function handler) { + pimpl->errorsubscribe_handler = std::move(handler); + } + + void async_client::command(const std::string &cmd, std::deque args, std::function handler) { + if(stateValid()) { + args.emplace_front(cmd); + + pimpl->post(std::bind(&base_client::doAsyncCommand, pimpl, std::move(pimpl->makeCommand(args)), std::move(handler))); + } + } + + async_client::subscribe_handle async_client::subscribe(const std::string &channel, std::function msg)> msgsubscribe_handler, std::function handler) { + auto handleId = pimpl->subscribe("subscribe", channel, msgsubscribe_handler, handler); + return { handleId , channel }; + } + + async_client::subscribe_handle async_client::psubscribe(const std::string &pattern, std::function msg)> msgsubscribe_handler, std::function handler) { + auto handleId = pimpl->subscribe("psubscribe", pattern, msgsubscribe_handler, handler); + return{ handleId , pattern }; + } + + void async_client::unsubscribe(const subscribe_handle &handle) { + pimpl->unsubscribe("unsubscribe", handle.id, handle.channel, dummysubscribe_handler); + } + + void async_client::punsubscribe(const subscribe_handle &handle) { + pimpl->unsubscribe("punsubscribe", handle.id, handle.channel, dummysubscribe_handler); + } + + void async_client::singleShotSubscribe(const std::string &channel, std::function msg)> msgsubscribe_handler, std::function handler) { + pimpl->singleShotSubscribe("subscribe", channel, msgsubscribe_handler, handler); + } + + void async_client::singleShotPSubscribe(const std::string &pattern, std::function msg)> msgsubscribe_handler, std::function handler) { + pimpl->singleShotSubscribe("psubscribe", pattern, msgsubscribe_handler, handler); + } + + void async_client::publish(const std::string &channel, const buffer &msg, std::function handler) { + assert(pimpl->state == State::Connected ); + static const std::string publishStr = "PUBLISH"; + + if(pimpl->state == State::Connected) { + std::deque items(3); + + items[0] = publishStr; + items[1] = channel; + items[2] = msg; + + pimpl->post(std::bind(&base_client::doAsyncCommand, pimpl, pimpl->makeCommand(items), std::move(handler))); + } else { + std::stringstream ss; + + ss << "async_client::command called with invalid state " << to_string(pimpl->state); + pimpl->errorsubscribe_handler(ss.str()); + } + } + + async_client::State async_client::state() const { + return pimpl->getState(); + } + + bool async_client::stateValid() const { + assert( pimpl->state == State::Connected ); + + if(pimpl->state != State::Connected) { + std::stringstream ss; + + ss << "async_client::command called with invalid state " << to_string(pimpl->state); + + pimpl->errorsubscribe_handler(ss.str()); + return false; + } + + return true; + } +} + +#endif diff --git a/src/base_client.cpp b/src/base_client.cpp new file mode 100644 index 0000000..9dfcd76 --- /dev/null +++ b/src/base_client.cpp @@ -0,0 +1,317 @@ +#ifndef REDIS_BASE_CLIENT_SOURCE +#define REDIS_BASE_CLIENT_SOURCE + +#include +#include + +#include + +#include "redis/base_client.hpp" + +namespace redis { + base_client::base_client(asio::io_context &ioService) : strand(ioService), socket(ioService), subscribeSeq(0), state(State::Unconnected) { + } + + base_client::~base_client() { + close(); + } + + void base_client::close() noexcept { + if(state != State::Closed) { + asio::error_code ignored_ec; + + msgsubscribe_handlers.clear(); + + socket.shutdown(asio::ip::tcp::socket::shutdown_both, ignored_ec); + socket.close(ignored_ec); + + state = State::Closed; + } + } + + base_client::State base_client::getState() const { + return state; + } + + void base_client::processMessage() { + socket.async_read_some(asio::buffer(buf), std::bind(&base_client::asyncRead, shared_from_this(), std::placeholders::_1, std::placeholders::_2)); + } + + void base_client::doProcessMessage(value v) { + if( state == State::Subscribed ) { + std::vector result = v.toArray(); + auto resultSize = result.size(); + + if( resultSize >= 3 ) { + const value &command = result[0]; + const value &queueName = result[(resultSize == 3)?1:2]; + const value &v = result[(resultSize == 3)?2:3]; + const value &pattern = (resultSize == 4) ? result[1] : ""; + + std::string cmd = command.toString(); + + if(cmd == "message" || cmd == "pmessage") { + SingleShotsubscribe_handlersMap::iterator it = singleShotMsgsubscribe_handlers.find(queueName.toString()); + + if(it != singleShotMsgsubscribe_handlers.end()) { + asio::post(std::bind(it->second, v.toByteArray())); + singleShotMsgsubscribe_handlers.erase(it); + } + + std::pair pair = msgsubscribe_handlers.equal_range(queueName.toString()); + + for(Msgsubscribe_handlersMap::iterator handlerIt = pair.first; handlerIt != pair.second; ++handlerIt) { + asio::post(asio::bind_executor(strand, std::bind(handlerIt->second.second, v.toByteArray()))); + } + }else if( handlers.empty() == false && (cmd == "subscribe" || cmd == "unsubscribe" || cmd == "psubscribe" || cmd == "punsubscribe")) { + handlers.front()(std::move(v)); + handlers.pop(); + } else { + std::stringstream ss; + + ss << "[RedisClient] invalid command: " << command.toString(); + + errorsubscribe_handler(ss.str()); + return; + } + } else { + errorsubscribe_handler("[RedisClient] Protocol error"); + return; + } + } else { + if( handlers.empty() == false ) { + handlers.front()(std::move(v)); + handlers.pop(); + } else { + std::stringstream ss; + ss << "[RedisClient] unexpected message: " << v.inspect(); + + errorsubscribe_handler(ss.str()); + return; + } + } + } + + void base_client::asyncWrite(const asio::error_code &ec, size_t) { + dataWrited.clear(); + + if(ec) { + errorsubscribe_handler(ec.message()); + return; + } + + if(dataQueued.empty() == false) { + std::vector buffers(dataQueued.size()); + + for(size_t i = 0; i < dataQueued.size(); ++i) { + buffers[i] = ::asio::buffer(dataQueued[i]); + } + + std::swap(dataQueued, dataWrited); + + asio::async_write(socket, buffers, std::bind(&base_client::asyncWrite, shared_from_this(), std::placeholders::_1, std::placeholders::_2)); + } + } + + void base_client::handleAsyncConnect(const asio::error_code &ec, const std::function &handler) { + if(!ec) { + socket.set_option(asio::ip::tcp::no_delay(true)); + state = State::Connected; + handler(true, std::string()); + processMessage(); + } else { + state = State::Unconnected; + handler(false, ec.message()); + } + } + + std::vector base_client::makeCommand(const std::deque &items) { + static const char crlf[] = {'\r', '\n'}; + std::vector result; + append(result, '*'); + append(result, std::to_string(items.size())); + append<>(result, crlf); + + for(const buffer &item: items) { + append(result, '$'); + append(result, std::to_string(item.size())); + append<>(result, crlf); + append(result, item); + append<>(result, crlf); + } + + return result; + } + + value base_client::doSyncCommand(const std::deque &buff) { + asio::error_code ec; + { + std::vector data = makeCommand(buff); + asio::write(socket, asio::buffer(data), asio::transfer_all(), ec); + } + + if(ec) { + errorsubscribe_handler(ec.message()); + return value(); + } else { + std::array inbuff; + + for(;;) { + size_t size = socket.read_some(asio::buffer(inbuff)); + + for(size_t pos = 0; pos < size;) { + std::pair result = redisParser.parse(inbuff.data() + pos, size - pos); + + if(result.second == parser::Completed) { + return redisParser.result(); + } else if(result.second == parser::Incompleted) { + pos += result.first; + continue; + } else { + errorsubscribe_handler("[RedisClient] Parser error"); + return value(); + } + } + } + } + } + + void base_client::doAsyncCommand(std::vector buff, std::function handler) { + handlers.push( std::move(handler) ); + dataQueued.push_back(std::move(buff)); + + if(dataWrited.empty()) { + // start transmit process + asyncWrite(asio::error_code(), 0); + } + } + + void base_client::asyncRead(const asio::error_code &ec, const size_t size) { + if(ec || size == 0) { + errorsubscribe_handler(ec.message()); + return; + } + + for(size_t pos = 0; pos < size;) { + std::pair result = redisParser.parse(buf.data() + pos, size - pos); + + if(result.second == parser::Completed) { + doProcessMessage(std::move(redisParser.result())); + } else if(result.second == parser::Incompleted) { + processMessage(); + return; + } else { + errorsubscribe_handler("[RedisClient] Parser error"); + return; + } + + pos += result.first; + } + + processMessage(); + } + + void base_client::onRedisError(const value &v) { + errorsubscribe_handler(v.toString()); + } + + void base_client::defaulErrorsubscribe_handler(const std::string &s) { + throw std::runtime_error(s); + } + + void base_client::append(std::vector &vec, const buffer &buf) { + vec.insert(vec.end(), buf.data(), buf.data() + buf.size()); + } + + void base_client::append(std::vector &vec, const std::string &s) { + vec.insert(vec.end(), s.begin(), s.end()); + } + + void base_client::append(std::vector &vec, const char *s) { + vec.insert(vec.end(), s, s + strlen(s)); + } + + void base_client::append(std::vector &vec, char c) { + vec.resize(vec.size() + 1); + vec[vec.size() - 1] = c; + } + + size_t base_client::subscribe(const std::string &command, const std::string &channel, std::function msg)> msgsubscribe_handler, std::function handler) { + assert(state == State::Connected || state == State::Subscribed); + + if (state == State::Connected || state == State::Subscribed) { + std::deque items{command, channel}; + post(std::bind(&base_client::doAsyncCommand, this, makeCommand(items), std::move(handler))); + msgsubscribe_handlers.insert(std::make_pair(channel, std::make_pair(subscribeSeq, std::move(msgsubscribe_handler)))); + state = State::Subscribed; + + return subscribeSeq++; + } else { + std::stringstream ss; + ss << "base_client::subscribe called with invalid state " << to_string(state); + + errorsubscribe_handler(ss.str()); + return 0; + } + } + + void base_client::singleShotSubscribe(const std::string &command, const std::string &channel, std::function msg)> msgsubscribe_handler, std::function handler) { + assert(state == State::Connected || state == State::Subscribed); + + if (state == State::Connected || state == State::Subscribed) { + std::deque items{command, channel}; + post(std::bind(&base_client::doAsyncCommand, this, makeCommand(items), std::move(handler))); + singleShotMsgsubscribe_handlers.insert(std::make_pair(channel, std::move(msgsubscribe_handler))); + state = State::Subscribed; + } else { + std::stringstream ss; + ss << "base_client::singleShotSubscribe called with invalid state " << to_string(state); + + errorsubscribe_handler(ss.str()); + } + } + + void base_client::unsubscribe(const std::string &command, size_t handleId, const std::string &channel, std::function handler) { + #ifdef DEBUG + static int recursion = 0; + assert(recursion++ == 0); + #endif + + assert(state == State::Connected || state == State::Subscribed); + + if (state == State::Connected || state == State::Subscribed) { + // Remove subscribe-handler + typedef base_client::Msgsubscribe_handlersMap::iterator iterator; + std::pair pair = msgsubscribe_handlers.equal_range(channel); + + for (iterator it = pair.first; it != pair.second;) { + if (it->second.first == handleId) { + msgsubscribe_handlers.erase(it++); + } else { + ++it; + } + } + + std::deque items{ command, channel }; + + // Unsubscribe command for Redis + post(std::bind(&base_client::doAsyncCommand, this, makeCommand(items), handler)); + } else { + std::stringstream ss; + + ss << "base_client::unsubscribe called with invalid state " << to_string(state); + + #ifdef DEBUG + --recursion; + #endif + errorsubscribe_handler(ss.str()); + return; + } + + #ifdef DEBUG + --recursion; + #endif + } +} + +#endif diff --git a/src/parser.cpp b/src/parser.cpp new file mode 100644 index 0000000..9af4f63 --- /dev/null +++ b/src/parser.cpp @@ -0,0 +1,389 @@ +#ifndef REDIS_PARSER_SOURCE +#define REDIS_PARSER_SOURCE + +#include +#include + +#include "redis/parser.hpp" + +namespace redis { + parser::parser() : state(Start), bulkSize(0) { + } + + std::pair parser::parse(const char *ptr, size_t size) { + if(!arrayStack.empty()) return parser::parseArray(ptr, size); + else return parser::parseChunk(ptr, size); + } + + std::pair parser::parseArray(const char *ptr, size_t size) { + assert(!arrayStack.empty()); + assert(!valueStack.empty()); + + long int arraySize = arrayStack.top(); + std::vector arrayValue = valueStack.top().toArray(); + + arrayStack.pop(); + valueStack.pop(); + + size_t position = 0; + + if(arrayStack.empty() == false ) { + std::pair pair = parseArray(ptr, size); + + if(pair.second != Completed) { + valueStack.push(std::move(arrayValue)); + arrayStack.push(arraySize); + + return pair; + } else { + arrayValue.push_back(std::move(valueStack.top())); + valueStack.pop(); + --arraySize; + } + + position += pair.first; + } + + if(position == size) { + valueStack.push(std::move(arrayValue)); + + if(arraySize == 0) { + return std::make_pair(position, Completed); + } else { + arrayStack.push(arraySize); + return std::make_pair(position, Incompleted); + } + } + + long int arrayIndex = 0; + + for(; arrayIndex < arraySize; ++arrayIndex) { + std::pair pair = parse(ptr + position, size - position); + + position += pair.first; + + if(pair.second == Error) { + return std::make_pair(position, Error); + } else if(pair.second == Incompleted) { + arraySize -= arrayIndex; + valueStack.push(std::move(arrayValue)); + arrayStack.push(arraySize); + + return std::make_pair(position, Incompleted); + } else { + assert( valueStack.empty() == false ); + arrayValue.push_back( std::move(valueStack.top()) ); + valueStack.pop(); + } + } + + assert(arrayIndex == arraySize); + valueStack.push(std::move(arrayValue)); + return std::make_pair(position, Completed); + } + + std::pair parser::parseChunk(const char *ptr, size_t size) { + size_t position = 0; + + for(; position < size; ++position) { + char c = ptr[position]; + + switch(state) { + case Start: + buf.clear(); + + switch(c) { + case stringReply: + state = String; + break; + case errorReply: + state = ErrorString; + break; + case integerReply: + state = Integer; + break; + case bulkReply: + state = BulkSize; + bulkSize = 0; + break; + case arrayReply: + state = ArraySize; + break; + default: + state = Start; + return std::make_pair(position + 1, Error); + } + break; + case String: + if( c == '\r' ) { + state = StringLF; + } else if(isChar(c) && !isControl(c)) { + buf.push_back(c); + } else { + state = Start; + return std::make_pair(position + 1, Error); + } + break; + case ErrorString: + if(c == '\r') { + state = ErrorLF; + } else if(isChar(c) && !isControl(c)) { + buf.push_back(c); + } else { + state = Start; + return std::make_pair(position + 1, Error); + } + break; + case BulkSize: + if(c == '\r') { + if( buf.empty()) { + state = Start; + return std::make_pair(position + 1, Error); + } else { + state = BulkSizeLF; + } + } else if( isdigit(c) || c == '-' ) { + buf.push_back(c); + } else { + state = Start; + return std::make_pair(position + 1, Error); + } + break; + case StringLF: + if( c == '\n') { + state = Start; + valueStack.push(buf); + return std::make_pair(position + 1, Completed); + } else { + state = Start; + return std::make_pair(position + 1, Error); + } + break; + case ErrorLF: + if(c == '\n') { + state = Start; + value::ErrorTag tag; + valueStack.push(value(buf, tag)); + return std::make_pair(position + 1, Completed); + } else { + state = Start; + return std::make_pair(position + 1, Error); + } + break; + case BulkSizeLF: + if(c == '\n') { + bulkSize = bufToLong(buf.data(), buf.size()); + buf.clear(); + + if(bulkSize == -1) { + state = Start; + valueStack.push(value()); // Nil + return std::make_pair(position + 1, Completed); + } else if( bulkSize == 0) { + state = BulkCR; + } + else if(bulkSize < 0) { + state = Start; + return std::make_pair(position + 1, Error); + } else { + buf.reserve(bulkSize); + + long int available = size - position - 1; + long int canRead = (std::min)(bulkSize, available); + + if(canRead > 0) { + buf.assign(ptr + position + 1, ptr + position + canRead + 1); + } + + position += canRead; + + if(bulkSize > available) { + bulkSize -= canRead; + state = Bulk; + return std::make_pair(position + 1, Incompleted); + } else { + state = BulkCR; + } + } + } else { + state = Start; + return std::make_pair(position + 1, Error); + } + break; + case Bulk: { + assert( bulkSize > 0 ); + + long int available = size - position; + long int canRead = (std::min)(available, bulkSize); + + buf.insert(buf.end(), ptr + position, ptr + canRead); + bulkSize -= canRead; + position += canRead - 1; + + if(bulkSize > 0) { + return std::make_pair(position + 1, Incompleted); + } else { + state = BulkCR; + + if(size == position + 1) { + return std::make_pair(position + 1, Incompleted); + } + } + break; + } + case BulkCR: + if(c == '\r') { + state = BulkLF; + } else { + state = Start; + return std::make_pair(position + 1, Error); + } + break; + case BulkLF: + if( c == '\n') { + state = Start; + valueStack.push(buf); + return std::make_pair(position + 1, Completed); + } else { + state = Start; + return std::make_pair(position + 1, Error); + } + break; + case ArraySize: + if(c == '\r') { + if(buf.empty()) { + state = Start; + return std::make_pair(position + 1, Error); + } else { + state = ArraySizeLF; + } + } else if( isdigit(c) || c == '-' ) { + buf.push_back(c); + } else { + state = Start; + return std::make_pair(position + 1, Error); + } + break; + case ArraySizeLF: + if(c == '\n') { + int64_t arraySize = bufToLong(buf.data(), buf.size()); + + buf.clear(); + std::vector array; + + if(arraySize == -1 || arraySize == 0) { + state = Start; + valueStack.push(std::move(array)); // Empty array + return std::make_pair(position + 1, Completed); + } else if(arraySize < 0) { + state = Start; + return std::make_pair(position + 1, Error); + } else { + array.reserve(arraySize); + arrayStack.push(arraySize); + valueStack.push(std::move(array)); + state = Start; + + if(position + 1 != size) { + std::pair parseResult = parseArray(ptr + position + 1, size - position - 1); + parseResult.first += position + 1; + return parseResult; + } else { + return std::make_pair(position + 1, Incompleted); + } + } + } else { + state = Start; + return std::make_pair(position + 1, Error); + } + break; + case Integer: + if(c == '\r') { + if(buf.empty()) { + state = Start; + return std::make_pair(position + 1, Error); + } else { + state = IntegerLF; + } + } else if(isdigit(c) || c == '-') { + buf.push_back(c); + } else { + state = Start; + return std::make_pair(position + 1, Error); + } + break; + case IntegerLF: + if(c == '\n') { + int64_t value = bufToLong(buf.data(), buf.size()); + + buf.clear(); + + valueStack.push(value); + state = Start; + + return std::make_pair(position + 1, Completed); + } else { + state = Start; + return std::make_pair(position + 1, Error); + } + break; + default: + state = Start; + return std::make_pair(position + 1, Error); + } + } + + return std::make_pair(position, Incompleted); + } + + value parser::result() { + assert( valueStack.empty() == false ); + + if(valueStack.empty() == false) { + value value = std::move(valueStack.top()); + valueStack.pop(); + + return value; + } else { + return value(); + } + } + + /* + * Convert string to long. I can't use atol/strtol because it + * work only with null terminated string. I can use temporary + * std::string object but that is slower then bufToLong. + */ + long int parser::bufToLong(const char *str, size_t size) { + long int value = 0; + bool sign = false; + + if( str == nullptr || size == 0 ) { + return 0; + } + + if( *str == '-' ) { + sign = true; + ++str; + --size; + + if( size == 0 ) { + return 0; + } + } + + for(const char *end = str + size; str != end; ++str) { + char c = *str; + + // char must be valid, already checked in the parser + assert(c >= '0' && c <= '9'); + + value = value * 10; + value += c - '0'; + } + + return sign ? -value : value; + } +} + +#endif diff --git a/src/redisclient/CMakeLists.txt b/src/redisclient/CMakeLists.txt deleted file mode 100644 index 26e4af7..0000000 --- a/src/redisclient/CMakeLists.txt +++ /dev/null @@ -1,49 +0,0 @@ -set(hdrs config.h - pipeline.h - redisasyncclient.h - redisbuffer.h - redisparser.h - redissyncclient.h - redisvalue.h - version.h - impl/redisclientimpl.h - impl/throwerror.h -) -set(srcs impl/pipeline.cpp - impl/redisasyncclient.cpp - impl/redisclientimpl.cpp - impl/redisparser.cpp - impl/redissyncclient.cpp - impl/redisvalue.cpp -) - -if (HEADER_ONLY) - RedisClientHeaderLibrary(RedisClient HEADERS ${hdrs} ${srcs} WITH_INSTALL) - target_compile_definitions(RedisClient - INTERFACE - REDIS_CLIENT_HEADER_ONLY - ) - target_include_directories(RedisClient - INTERFACE - ${Boost_INCLUDE_DIRS} - ) - target_link_libraries(RedisClient - INTERFACE - ${Boost_SYSTEM_LIBRARY} - ) -else() - RedisClientLibrary(RedisClient SOURCES ${srcs} HEADERS ${hdrs} WITH_INSTALL) - target_compile_definitions(RedisClient - PUBLIC - REDIS_CLIENT_SEPARATED_COMPILATION - ) - target_include_directories(RedisClient - PUBLIC - ${Boost_INCLUDE_DIRS} - ) - target_link_libraries(RedisClient - PUBLIC - ${Boost_SYSTEM_LIBRARY} - ) -endif() - diff --git a/src/redisclient/config.h b/src/redisclient/config.h deleted file mode 100644 index 883d73e..0000000 --- a/src/redisclient/config.h +++ /dev/null @@ -1,34 +0,0 @@ -/* - * Copyright (C) Alex Nekipelov (alex@nekipelov.net) - * License: MIT - */ - -#ifndef REDISCLIENT_CONFIG_H -#define REDISCLIENT_CONFIG_H - -// Default to a header-only compilation -#ifndef REDIS_CLIENT_HEADER_ONLY -# ifndef REDIS_CLIENT_SEPARATED_COMPILATION -# define REDIS_CLIENT_HEADER_ONLY -# endif -#endif - -#ifdef REDIS_CLIENT_HEADER_ONLY -# define REDIS_CLIENT_DECL inline -#else -# if defined(WIN32) && defined(REDIS_CLIENT_DYNLIB) -# // Build to a Window dynamic library (DLL) -# ifdef REDIS_CLIENT_BUILD -# define REDIS_CLIENT_DECL __declspec(dllexport) -# else -# define REDIS_CLIENT_DECL __declspec(dllimport) -# endif -# endif -#endif - -#ifndef REDIS_CLIENT_DECL -# define REDIS_CLIENT_DECL -#endif - - -#endif // REDISCLIENT_CONFIG_H diff --git a/src/redisclient/impl/pipeline.cpp b/src/redisclient/impl/pipeline.cpp deleted file mode 100644 index e1d9639..0000000 --- a/src/redisclient/impl/pipeline.cpp +++ /dev/null @@ -1,40 +0,0 @@ -/* - * Copyright (C) Alex Nekipelov (alex@nekipelov.net) - * License: MIT - */ - -#ifndef REDISCLIENT_PIPELINE_CPP -#define REDISCLIENT_PIPELINE_CPP - -#include "redisclient/pipeline.h" -#include "redisclient/redisvalue.h" -#include "redisclient/redissyncclient.h" - -namespace redisclient -{ - -Pipeline::Pipeline(RedisSyncClient &client) - : client(client) -{ -} - -Pipeline &Pipeline::command(std::string cmd, std::deque args) -{ - args.push_front(std::move(cmd)); - commands.push_back(std::move(args)); - return *this; -} - -RedisValue Pipeline::finish() -{ - return client.pipelined(std::move(commands)); -} - -RedisValue Pipeline::finish(boost::system::error_code &ec) -{ - return client.pipelined(std::move(commands), ec); -} - -} - -#endif // REDISCLIENT_PIPELINE_CPP diff --git a/src/redisclient/impl/redisasyncclient.cpp b/src/redisclient/impl/redisasyncclient.cpp deleted file mode 100644 index e09137a..0000000 --- a/src/redisclient/impl/redisasyncclient.cpp +++ /dev/null @@ -1,200 +0,0 @@ -/* - * Copyright (C) Alex Nekipelov (alex@nekipelov.net) - * License: MIT - */ - -#ifndef REDISASYNCCLIENT_REDISASYNCCLIENT_CPP -#define REDISASYNCCLIENT_REDISASYNCCLIENT_CPP - -#include -#include - -#include "redisclient/impl/throwerror.h" -#include "redisclient/redisasyncclient.h" - -namespace redisclient { - -RedisAsyncClient::RedisAsyncClient(boost::asio::io_service &ioService) - : pimpl(std::make_shared(ioService)) -{ - pimpl->errorHandler = std::bind(&RedisClientImpl::defaulErrorHandler, std::placeholders::_1); -} - -RedisAsyncClient::~RedisAsyncClient() -{ - pimpl->close(); -} - -void RedisAsyncClient::connect(const boost::asio::ip::tcp::endpoint &endpoint, - std::function handler) -{ - if( pimpl->state == State::Closed ) - { - pimpl->redisParser = RedisParser(); - std::move(pimpl->socket); - } - - if( pimpl->state == State::Unconnected || pimpl->state == State::Closed ) - { - pimpl->state = State::Connecting; - pimpl->socket.async_connect(endpoint, std::bind(&RedisClientImpl::handleAsyncConnect, - pimpl, std::placeholders::_1, std::move(handler))); - } - else - { - // FIXME: add correct error message - //std::stringstream ss; - //ss << "RedisAsyncClient::connect called on socket with state " << to_string(pimpl->state); - //handler(false, ss.str()); - handler(boost::system::error_code()); - } -} - -#ifdef BOOST_ASIO_HAS_LOCAL_SOCKETS - -void RedisAsyncClient::connect(const boost::asio::local::stream_protocol::endpoint &endpoint, - std::function handler) -{ - if( pimpl->state == State::Unconnected || pimpl->state == State::Closed ) - { - pimpl->state = State::Connecting; - pimpl->socket.async_connect(endpoint, std::bind(&RedisClientImpl::handleAsyncConnect, - pimpl, std::placeholders::_1, std::move(handler))); - } - else - { - // FIXME: add correct error message - //std::stringstream ss; - //ss << "RedisAsyncClient::connect called on socket with state " << to_string(pimpl->state); - //handler(false, ss.str()); - handler(boost::system::error_code()); - } -} - -#endif - -bool RedisAsyncClient::isConnected() const -{ - return pimpl->getState() == State::Connected || - pimpl->getState() == State::Subscribed; -} - -void RedisAsyncClient::disconnect() -{ - pimpl->close(); -} - -void RedisAsyncClient::installErrorHandler(std::function handler) -{ - pimpl->errorHandler = std::move(handler); -} - -void RedisAsyncClient::command(const std::string &cmd, std::deque args, - std::function handler) -{ - if(stateValid()) - { - args.emplace_front(cmd); - - pimpl->post(std::bind(&RedisClientImpl::doAsyncCommand, pimpl, - pimpl->makeCommand(args), std::move(handler))); - } -} - -RedisAsyncClient::Handle RedisAsyncClient::subscribe( - const std::string &channel, - std::function msg)> msgHandler, - std::function handler) -{ - auto handleId = pimpl->subscribe("subscribe", channel, msgHandler, handler); - return { handleId , channel }; -} - -RedisAsyncClient::Handle RedisAsyncClient::psubscribe( - const std::string &pattern, - std::function msg)> msgHandler, - std::function handler) -{ - auto handleId = pimpl->subscribe("psubscribe", pattern, msgHandler, handler); - return{ handleId , pattern }; -} - -void RedisAsyncClient::unsubscribe(const Handle &handle) -{ - pimpl->unsubscribe("unsubscribe", handle.id, handle.channel, dummyHandler); -} - -void RedisAsyncClient::punsubscribe(const Handle &handle) -{ - pimpl->unsubscribe("punsubscribe", handle.id, handle.channel, dummyHandler); -} - -void RedisAsyncClient::singleShotSubscribe(const std::string &channel, - std::function msg)> msgHandler, - std::function handler) -{ - pimpl->singleShotSubscribe("subscribe", channel, msgHandler, handler); -} - -void RedisAsyncClient::singleShotPSubscribe(const std::string &pattern, - std::function msg)> msgHandler, - std::function handler) -{ - pimpl->singleShotSubscribe("psubscribe", pattern, msgHandler, handler); -} - -void RedisAsyncClient::publish(const std::string &channel, const RedisBuffer &msg, - std::function handler) -{ - assert( pimpl->state == State::Connected ); - - static const std::string publishStr = "PUBLISH"; - - if( pimpl->state == State::Connected ) - { - std::deque items(3); - - items[0] = publishStr; - items[1] = channel; - items[2] = msg; - - pimpl->post(std::bind(&RedisClientImpl::doAsyncCommand, pimpl, - pimpl->makeCommand(items), std::move(handler))); - } - else - { - std::stringstream ss; - - ss << "RedisAsyncClient::command called with invalid state " - << to_string(pimpl->state); - - pimpl->errorHandler(ss.str()); - } -} - -RedisAsyncClient::State RedisAsyncClient::state() const -{ - return pimpl->getState(); -} - -bool RedisAsyncClient::stateValid() const -{ - assert( pimpl->state == State::Connected ); - - if( pimpl->state != State::Connected ) - { - std::stringstream ss; - - ss << "RedisAsyncClient::command called with invalid state " - << to_string(pimpl->state); - - pimpl->errorHandler(ss.str()); - return false; - } - - return true; -} - -} - -#endif // REDISASYNCCLIENT_REDISASYNCCLIENT_CPP diff --git a/src/redisclient/impl/redisclientimpl.cpp b/src/redisclient/impl/redisclientimpl.cpp deleted file mode 100644 index fc43856..0000000 --- a/src/redisclient/impl/redisclientimpl.cpp +++ /dev/null @@ -1,674 +0,0 @@ -/* - * Copyright (C) Alex Nekipelov (alex@nekipelov.net) - * License: MIT - */ - -#ifndef REDISCLIENT_REDISCLIENTIMPL_CPP -#define REDISCLIENT_REDISCLIENTIMPL_CPP - -#include - -#include - -#include "redisclientimpl.h" - -namespace -{ - static const char crlf[] = {'\r', '\n'}; - inline void bufferAppend(std::vector &vec, const std::string &s); - inline void bufferAppend(std::vector &vec, const std::vector &s); - inline void bufferAppend(std::vector &vec, const char *s); - inline void bufferAppend(std::vector &vec, char c); - template - inline void bufferAppend(std::vector &vec, const char (&s)[size]); - - inline void bufferAppend(std::vector &vec, const redisclient::RedisBuffer &buf) - { - if (buf.data.type() == typeid(std::string)) - bufferAppend(vec, boost::get(buf.data)); - else - bufferAppend(vec, boost::get>(buf.data)); - } - - inline void bufferAppend(std::vector &vec, const std::string &s) - { - vec.insert(vec.end(), s.begin(), s.end()); - } - - inline void bufferAppend(std::vector &vec, const std::vector &s) - { - vec.insert(vec.end(), s.begin(), s.end()); - } - - inline void bufferAppend(std::vector &vec, const char *s) - { - vec.insert(vec.end(), s, s + strlen(s)); - } - - inline void bufferAppend(std::vector &vec, char c) - { - vec.resize(vec.size() + 1); - vec[vec.size() - 1] = c; - } - - template - inline void bufferAppend(std::vector &vec, const char (&s)[size]) - { - vec.insert(vec.end(), s, s + size); - } - - ssize_t socketReadSomeImpl(int socket, char *buffer, size_t size, - size_t timeoutMsec) - { - struct timeval tv = {static_cast(timeoutMsec / 1000), - static_cast<__suseconds_t>((timeoutMsec % 1000) * 1000)}; - int result = setsockopt(socket, SOL_SOCKET, SO_RCVTIMEO, &tv, sizeof(tv)); - - if (result != 0) - { - return result; - } - - pollfd pfd; - - pfd.fd = socket; - pfd.events = POLLIN; - - result = ::poll(&pfd, 1, timeoutMsec); - if (result > 0) - { - return recv(socket, buffer, size, MSG_DONTWAIT); - } - else - { - return result; - } - } - - size_t socketReadSome(int socket, boost::asio::mutable_buffer buffer, - const boost::posix_time::time_duration &timeout, - boost::system::error_code &ec) - { - size_t bytesRecv = 0; - size_t timeoutMsec = timeout.total_milliseconds(); - - for(;;) - { - ssize_t result = socketReadSomeImpl(socket, - boost::asio::buffer_cast(buffer) + bytesRecv, - boost::asio::buffer_size(buffer) - bytesRecv, timeoutMsec); - - if (result < 0) - { - if (errno == EINTR) - { - continue; - } - else - { - ec = boost::system::error_code(errno, - boost::asio::error::get_system_category()); - break; - } - } - else if (result == 0) - { - // boost::asio::error::connection_reset(); - // boost::asio::error::eof - ec = boost::asio::error::eof; - break; - } - else - { - bytesRecv += result; - break; - } - } - - return bytesRecv; - } - - - ssize_t socketWriteImpl(int socket, const char *buffer, size_t size, - size_t timeoutMsec) - { - struct timeval tv = {static_cast(timeoutMsec / 1000), - static_cast<__suseconds_t>((timeoutMsec % 1000) * 1000)}; - int result = setsockopt(socket, SOL_SOCKET, SO_RCVTIMEO, &tv, sizeof(tv)); - - if (result != 0) - { - return result; - } - - pollfd pfd; - - pfd.fd = socket; - pfd.events = POLLOUT; - - result = ::poll(&pfd, 1, timeoutMsec); - if (result > 0) - { - return send(socket, buffer, size, 0); - } - else - { - return result; - } - } - - size_t socketWrite(int socket, boost::asio::const_buffer buffer, - const boost::posix_time::time_duration &timeout, - boost::system::error_code &ec) - { - size_t bytesSend = 0; - size_t timeoutMsec = timeout.total_milliseconds(); - - while(bytesSend < boost::asio::buffer_size(buffer)) - { - ssize_t result = socketWriteImpl(socket, - boost::asio::buffer_cast(buffer) + bytesSend, - boost::asio::buffer_size(buffer) - bytesSend, timeoutMsec); - - if (result < 0) - { - if (errno == EINTR) - { - continue; - } - else - { - ec = boost::system::error_code(errno, - boost::asio::error::get_system_category()); - break; - } - } - else if (result == 0) - { - // boost::asio::error::connection_reset(); - // boost::asio::error::eof - ec = boost::asio::error::eof; - break; - } - else - { - bytesSend += result; - } - } - - return bytesSend; - } - - size_t socketWrite(int socket, const std::vector &buffers, - const boost::posix_time::time_duration &timeout, - boost::system::error_code &ec) - { - size_t bytesSend = 0; - for(const auto &buffer: buffers) - { - bytesSend += socketWrite(socket, buffer, timeout, ec); - - if (ec) - break; - } - - return bytesSend; - } -} - -namespace redisclient { - -RedisClientImpl::RedisClientImpl(boost::asio::io_service &ioService_) - : ioService(ioService_), strand(ioService), socket(ioService), - bufSize(0),subscribeSeq(0), state(State::Unconnected) -{ -} - -RedisClientImpl::~RedisClientImpl() -{ - close(); -} - -void RedisClientImpl::close() noexcept -{ - boost::system::error_code ignored_ec; - - msgHandlers.clear(); - decltype(handlers)().swap(handlers); - - socket.cancel(ignored_ec); - socket.shutdown(boost::asio::ip::tcp::socket::shutdown_both, ignored_ec); - socket.close(ignored_ec); - - state = State::Closed; -} - -RedisClientImpl::State RedisClientImpl::getState() const -{ - return state; -} - -void RedisClientImpl::processMessage() -{ - socket.async_read_some(boost::asio::buffer(buf), - std::bind(&RedisClientImpl::asyncRead, - shared_from_this(), std::placeholders::_1, std::placeholders::_2)); -} - -void RedisClientImpl::doProcessMessage(RedisValue v) -{ - if( state == State::Subscribed ) - { - std::vector result = v.toArray(); - auto resultSize = result.size(); - - if( resultSize >= 3 ) - { - const RedisValue &command = result[0]; - const RedisValue &queueName = result[(resultSize == 3)?1:2]; - const RedisValue &value = result[(resultSize == 3)?2:3]; - const RedisValue &pattern = (resultSize == 4) ? result[1] : queueName; - - std::string cmd = command.toString(); - - if( cmd == "message" || cmd == "pmessage" ) - { - SingleShotHandlersMap::iterator it = singleShotMsgHandlers.find(pattern.toString()); - if( it != singleShotMsgHandlers.end() ) - { - strand.post(std::bind(it->second, value.toByteArray())); - singleShotMsgHandlers.erase(it); - } - - std::pair pair = - msgHandlers.equal_range(pattern.toString()); - for(MsgHandlersMap::iterator handlerIt = pair.first; - handlerIt != pair.second; ++handlerIt) - { - strand.post(std::bind(handlerIt->second.second, value.toByteArray())); - } - } - else if( handlers.empty() == false && - (cmd == "subscribe" || cmd == "unsubscribe" || - cmd == "psubscribe" || cmd == "punsubscribe") - ) - { - handlers.front()(std::move(v)); - handlers.pop(); - } - else - { - std::stringstream ss; - - ss << "[RedisClient] invalid command: " - << command.toString(); - - errorHandler(ss.str()); - return; - } - } - - else - { - errorHandler("[RedisClient] Protocol error"); - return; - } - } - else - { - if( handlers.empty() == false ) - { - handlers.front()(std::move(v)); - handlers.pop(); - } - else - { - std::stringstream ss; - - ss << "[RedisClient] unexpected message: " - << v.inspect(); - - errorHandler(ss.str()); - return; - } - } -} - -void RedisClientImpl::asyncWrite(const boost::system::error_code &ec, size_t) -{ - dataWrited.clear(); - - if( ec ) - { - errorHandler(ec.message()); - return; - } - - if( dataQueued.empty() == false ) - { - std::vector buffers(dataQueued.size()); - - for(size_t i = 0; i < dataQueued.size(); ++i) - { - buffers[i] = boost::asio::buffer(dataQueued[i]); - } - - std::swap(dataQueued, dataWrited); - - boost::asio::async_write(socket, buffers, - std::bind(&RedisClientImpl::asyncWrite, shared_from_this(), - std::placeholders::_1, std::placeholders::_2)); - } -} - -void RedisClientImpl::handleAsyncConnect(const boost::system::error_code &ec, - std::function handler) -{ - if( !ec ) - { - boost::system::error_code ec2; // Ignore errors in set_option - socket.set_option(boost::asio::ip::tcp::no_delay(true), ec2); - state = State::Connected; - handler(ec); - processMessage(); - } - else - { - state = State::Unconnected; - handler(ec); - } -} - -std::vector RedisClientImpl::makeCommand(const std::deque &items) -{ - std::vector result; - - bufferAppend(result, '*'); - bufferAppend(result, std::to_string(items.size())); - bufferAppend<>(result, crlf); - - for(const auto &item: items) - { - bufferAppend(result, '$'); - bufferAppend(result, std::to_string(item.size())); - bufferAppend<>(result, crlf); - bufferAppend(result, item); - bufferAppend<>(result, crlf); - } - - return result; -} -RedisValue RedisClientImpl::doSyncCommand(const std::deque &command, - const boost::posix_time::time_duration &timeout, - boost::system::error_code &ec) -{ - std::vector data = makeCommand(command); - socketWrite(socket.native_handle(), boost::asio::buffer(data), timeout, ec); - - if( ec ) - { - return RedisValue(); - } - - return syncReadResponse(timeout, ec); -} - -RedisValue RedisClientImpl::doSyncCommand(const std::deque> &commands, - const boost::posix_time::time_duration &timeout, - boost::system::error_code &ec) -{ - std::vector> data; - std::vector buffers; - - data.reserve(commands.size()); - buffers.reserve(commands.size()); - - for(const auto &command: commands) - { - data.push_back(makeCommand(command)); - buffers.push_back(boost::asio::buffer(data.back())); - } - - socketWrite(socket.native_handle(), buffers, timeout, ec); - - if( ec ) - { - return RedisValue(); - } - - std::vector responses; - - for(size_t i = 0; i < commands.size(); ++i) - { - responses.push_back(syncReadResponse(timeout, ec)); - - if (ec) - { - return RedisValue(); - } - } - - return RedisValue(std::move(responses)); -} - -RedisValue RedisClientImpl::syncReadResponse( - const boost::posix_time::time_duration &timeout, - boost::system::error_code &ec) -{ - for(;;) - { - if (bufSize == 0) - { - bufSize = socketReadSome(socket.native_handle(), - boost::asio::buffer(buf), timeout, ec); - - if (ec) - return RedisValue(); - } - - for(size_t pos = 0; pos < bufSize;) - { - std::pair result = - redisParser.parse(buf.data() + pos, bufSize - pos); - - pos += result.first; - - ::memmove(buf.data(), buf.data() + pos, bufSize - pos); - bufSize -= pos; - - if( result.second == RedisParser::Completed ) - { - return redisParser.result(); - } - else if( result.second == RedisParser::Incompleted ) - { - continue; - } - else - { - errorHandler("[RedisClient] Parser error"); - return RedisValue(); - } - } - } -} - -void RedisClientImpl::doAsyncCommand(std::vector buff, - std::function handler) -{ - handlers.push( std::move(handler) ); - dataQueued.push_back(std::move(buff)); - - if( dataWrited.empty() ) - { - // start transmit process - asyncWrite(boost::system::error_code(), 0); - } -} - -void RedisClientImpl::asyncRead(const boost::system::error_code &ec, const size_t size) -{ - if( ec || size == 0 ) - { - if (ec != boost::asio::error::operation_aborted) - { - errorHandler(ec.message()); - } - return; - } - - for(size_t pos = 0; pos < size;) - { - std::pair result = redisParser.parse(buf.data() + pos, size - pos); - - if( result.second == RedisParser::Completed ) - { - doProcessMessage(redisParser.result()); - } - else if( result.second == RedisParser::Incompleted ) - { - processMessage(); - return; - } - else - { - errorHandler("[RedisClient] Parser error"); - return; - } - - pos += result.first; - } - - processMessage(); -} - -void RedisClientImpl::onRedisError(const RedisValue &v) -{ - errorHandler(v.toString()); -} - -void RedisClientImpl::defaulErrorHandler(const std::string &s) -{ - throw std::runtime_error(s); -} - -size_t RedisClientImpl::subscribe( - const std::string &command, - const std::string &channel, - std::function msg)> msgHandler, - std::function handler) -{ - assert(state == State::Connected || - state == State::Subscribed); - - if (state == State::Connected || state == State::Subscribed) - { - std::deque items{ command, channel }; - - post(std::bind(&RedisClientImpl::doAsyncCommand, this, makeCommand(items), std::move(handler))); - msgHandlers.insert(std::make_pair(channel, std::make_pair(subscribeSeq, std::move(msgHandler)))); - state = State::Subscribed; - - return subscribeSeq++; - } - else - { - std::stringstream ss; - - ss << "RedisClientImpl::subscribe called with invalid state " - << to_string(state); - - errorHandler(ss.str()); - return 0; - } -} - -void RedisClientImpl::singleShotSubscribe( - const std::string &command, - const std::string &channel, - std::function msg)> msgHandler, - std::function handler) -{ - assert(state == State::Connected || - state == State::Subscribed); - - if (state == State::Connected || - state == State::Subscribed) - { - std::deque items{ command, channel }; - - post(std::bind(&RedisClientImpl::doAsyncCommand, this, makeCommand(items), std::move(handler))); - singleShotMsgHandlers.insert(std::make_pair(channel, std::move(msgHandler))); - state = State::Subscribed; - } - else - { - std::stringstream ss; - - ss << "RedisClientImpl::singleShotSubscribe called with invalid state " - << to_string(state); - - errorHandler(ss.str()); - } -} - -void RedisClientImpl::unsubscribe(const std::string &command, - size_t handleId, - const std::string &channel, - std::function handler) -{ -#ifdef DEBUG - static int recursion = 0; - assert(recursion++ == 0); -#endif - - assert(state == State::Connected || - state == State::Subscribed); - - if (state == State::Connected || - state == State::Subscribed) - { - // Remove subscribe-handler - typedef RedisClientImpl::MsgHandlersMap::iterator iterator; - std::pair pair = msgHandlers.equal_range(channel); - - for (iterator it = pair.first; it != pair.second;) - { - if (it->second.first == handleId) - { - msgHandlers.erase(it++); - } - else - { - ++it; - } - } - - std::deque items{ command, channel }; - - // Unsubscribe command for Redis - post(std::bind(&RedisClientImpl::doAsyncCommand, this, - makeCommand(items), handler)); - } - else - { - std::stringstream ss; - - ss << "RedisClientImpl::unsubscribe called with invalid state " - << to_string(state); - -#ifdef DEBUG - --recursion; -#endif - errorHandler(ss.str()); - return; - } - -#ifdef DEBUG - --recursion; -#endif -} - -} - -#endif // REDISCLIENT_REDISCLIENTIMPL_CPP diff --git a/src/redisclient/impl/redisclientimpl.h b/src/redisclient/impl/redisclientimpl.h deleted file mode 100644 index 16a9b4a..0000000 --- a/src/redisclient/impl/redisclientimpl.h +++ /dev/null @@ -1,153 +0,0 @@ -/* - * Copyright (C) Alex Nekipelov (alex@nekipelov.net) - * License: MIT - */ - -#ifndef REDISCLIENT_REDISCLIENTIMPL_H -#define REDISCLIENT_REDISCLIENTIMPL_H - -#include -#include -#include -#include -#include -#include -#include - -#include -#include -#include -#include -#include -#include - -#include "redisclient/redisparser.h" -#include "redisclient/redisbuffer.h" -#include "redisclient/config.h" - -namespace redisclient { - -class RedisClientImpl : public std::enable_shared_from_this { -public: - enum class State { - Unconnected, - Connecting, - Connected, - Subscribed, - Closed - }; - - REDIS_CLIENT_DECL RedisClientImpl(boost::asio::io_service &ioService); - REDIS_CLIENT_DECL ~RedisClientImpl(); - - REDIS_CLIENT_DECL void handleAsyncConnect( - const boost::system::error_code &ec, - std::function handler); - - REDIS_CLIENT_DECL size_t subscribe(const std::string &command, - const std::string &channel, - std::function msg)> msgHandler, - std::function handler); - - REDIS_CLIENT_DECL void singleShotSubscribe(const std::string &command, - const std::string &channel, - std::function msg)> msgHandler, - std::function handler); - - REDIS_CLIENT_DECL void unsubscribe(const std::string &command, - size_t handle_id, const std::string &channel, - std::function handler); - - REDIS_CLIENT_DECL void close() noexcept; - - REDIS_CLIENT_DECL State getState() const; - - REDIS_CLIENT_DECL static std::vector makeCommand(const std::deque &items); - - REDIS_CLIENT_DECL RedisValue doSyncCommand(const std::deque &command, - const boost::posix_time::time_duration &timeout, - boost::system::error_code &ec); - REDIS_CLIENT_DECL RedisValue doSyncCommand(const std::deque> &commands, - const boost::posix_time::time_duration &timeout, - boost::system::error_code &ec); - REDIS_CLIENT_DECL RedisValue syncReadResponse( - const boost::posix_time::time_duration &timeout, - boost::system::error_code &ec); - - REDIS_CLIENT_DECL void doAsyncCommand( - std::vector buff, - std::function handler); - - REDIS_CLIENT_DECL void sendNextCommand(); - REDIS_CLIENT_DECL void processMessage(); - REDIS_CLIENT_DECL void doProcessMessage(RedisValue v); - REDIS_CLIENT_DECL void asyncWrite(const boost::system::error_code &ec, const size_t); - REDIS_CLIENT_DECL void asyncRead(const boost::system::error_code &ec, const size_t); - - REDIS_CLIENT_DECL void onRedisError(const RedisValue &); - REDIS_CLIENT_DECL static void defaulErrorHandler(const std::string &s); - - template - inline void post(const Handler &handler); - - boost::asio::io_service &ioService; - boost::asio::io_service::strand strand; - boost::asio::generic::stream_protocol::socket socket; - RedisParser redisParser; - boost::array buf; - size_t bufSize; // only for sync - size_t subscribeSeq; - - typedef std::pair &buf)> > MsgHandlerType; - typedef std::function &buf)> SingleShotHandlerType; - - typedef std::multimap MsgHandlersMap; - typedef std::multimap SingleShotHandlersMap; - - std::queue > handlers; - std::deque> dataWrited; - std::deque> dataQueued; - MsgHandlersMap msgHandlers; - SingleShotHandlersMap singleShotMsgHandlers; - - std::function errorHandler; - State state; -}; - -template -inline void RedisClientImpl::post(const Handler &handler) -{ - strand.post(handler); -} - -inline std::string to_string(RedisClientImpl::State state) -{ - switch(state) - { - case RedisClientImpl::State::Unconnected: - return "Unconnected"; - break; - case RedisClientImpl::State::Connecting: - return "Connecting"; - break; - case RedisClientImpl::State::Connected: - return "Connected"; - break; - case RedisClientImpl::State::Subscribed: - return "Subscribed"; - break; - case RedisClientImpl::State::Closed: - return "Closed"; - break; - } - - return "Invalid"; -} -} - - -#ifdef REDIS_CLIENT_HEADER_ONLY -#include "redisclientimpl.cpp" -#endif - -#endif // REDISCLIENT_REDISCLIENTIMPL_H diff --git a/src/redisclient/impl/redisparser.cpp b/src/redisclient/impl/redisparser.cpp deleted file mode 100644 index a3717be..0000000 --- a/src/redisclient/impl/redisparser.cpp +++ /dev/null @@ -1,432 +0,0 @@ -/* - * Copyright (C) Alex Nekipelov (alex@nekipelov.net) - * License: MIT - */ - -#ifndef REDISCLIENT_REDISPARSER_CPP -#define REDISCLIENT_REDISPARSER_CPP - -#include -#include - -#ifdef DEBUG_REDIS_PARSER -#include -#endif - -#include "redisclient/redisparser.h" - -namespace redisclient { - -RedisParser::RedisParser() - : bulkSize(0) -{ - buf.reserve(64); -} - -std::pair RedisParser::parse(const char *ptr, size_t size) -{ - return RedisParser::parseChunk(ptr, size); -} - -std::pair RedisParser::parseChunk(const char *ptr, size_t size) -{ - size_t position = 0; - State state = Start; - - if (!states.empty()) - { - state = states.top(); - states.pop(); - } - - while(position < size) - { - char c = ptr[position++]; -#ifdef DEBUG_REDIS_PARSER - std::cerr << "state: " << state << ", c: " << c << "\n"; -#endif - - switch(state) - { - case StartArray: - case Start: - buf.clear(); - switch(c) - { - case stringReply: - state = String; - break; - case errorReply: - state = ErrorString; - break; - case integerReply: - state = Integer; - break; - case bulkReply: - state = BulkSize; - bulkSize = 0; - break; - case arrayReply: - state = ArraySize; - break; - default: - return std::make_pair(position, Error); - } - break; - case String: - if( c == '\r' ) - { - state = StringLF; - } - else if( isChar(c) && !isControl(c) ) - { - buf.push_back(c); - } - else - { - std::stack().swap(states); - return std::make_pair(position, Error); - } - break; - case ErrorString: - if( c == '\r' ) - { - state = ErrorLF; - } - else if( isChar(c) && !isControl(c) ) - { - buf.push_back(c); - } - else - { - std::stack().swap(states); - return std::make_pair(position, Error); - } - break; - case BulkSize: - if( c == '\r' ) - { - if( buf.empty() ) - { - std::stack().swap(states); - return std::make_pair(position, Error); - } - else - { - state = BulkSizeLF; - } - } - else if( isdigit(c) || c == '-' ) - { - buf.push_back(c); - } - else - { - std::stack().swap(states); - return std::make_pair(position, Error); - } - break; - case StringLF: - if( c == '\n') - { - state = Start; - redisValue = RedisValue(buf); - } - else - { - std::stack().swap(states); - return std::make_pair(position, Error); - } - break; - case ErrorLF: - if( c == '\n') - { - state = Start; - RedisValue::ErrorTag tag; - redisValue = RedisValue(buf, tag); - } - else - { - std::stack().swap(states); - return std::make_pair(position, Error); - } - break; - case BulkSizeLF: - if( c == '\n' ) - { - bulkSize = bufToLong(buf.data(), buf.size()); - buf.clear(); - - if( bulkSize == -1 ) - { - state = Start; - redisValue = RedisValue(); // Nil - } - else if( bulkSize == 0 ) - { - state = BulkCR; - } - else if( bulkSize < 0 ) - { - std::stack().swap(states); - return std::make_pair(position, Error); - } - else - { - buf.reserve(bulkSize); - - long int available = size - position; - long int canRead = std::min(bulkSize, available); - - if( canRead > 0 ) - { - buf.assign(ptr + position, ptr + position + canRead); - position += canRead; - bulkSize -= canRead; - } - - - if (bulkSize > 0) - { - state = Bulk; - } - else - { - state = BulkCR; - } - } - } - else - { - std::stack().swap(states); - return std::make_pair(position, Error); - } - break; - case Bulk: { - assert( bulkSize > 0 ); - - long int available = size - position + 1; - long int canRead = std::min(available, bulkSize); - - buf.insert(buf.end(), ptr + position - 1, ptr + position - 1 + canRead); - bulkSize -= canRead; - position += canRead - 1; - - if( bulkSize == 0 ) - { - state = BulkCR; - } - break; - } - case BulkCR: - if( c == '\r') - { - state = BulkLF; - } - else - { - std::stack().swap(states); - return std::make_pair(position, Error); - } - break; - case BulkLF: - if( c == '\n') - { - state = Start; - redisValue = RedisValue(buf); - } - else - { - std::stack().swap(states); - return std::make_pair(position, Error); - } - break; - case ArraySize: - if( c == '\r' ) - { - if( buf.empty() ) - { - std::stack().swap(states); - return std::make_pair(position, Error); - } - else - { - state = ArraySizeLF; - } - } - else if( isdigit(c) || c == '-' ) - { - buf.push_back(c); - } - else - { - std::stack().swap(states); - return std::make_pair(position, Error); - } - break; - case ArraySizeLF: - if( c == '\n' ) - { - int64_t arraySize = bufToLong(buf.data(), buf.size()); - std::vector array; - - if( arraySize == -1 ) - { - state = Start; - redisValue = RedisValue(); // Nil value - } - else if( arraySize == 0 ) - { - state = Start; - redisValue = RedisValue(std::move(array)); // Empty array - } - else if( arraySize < 0 ) - { - std::stack().swap(states); - return std::make_pair(position, Error); - } - else - { - array.reserve(arraySize); - arraySizes.push(arraySize); - arrayValues.push(std::move(array)); - - state = StartArray; - } - } - else - { - std::stack().swap(states); - return std::make_pair(position, Error); - } - break; - case Integer: - if( c == '\r' ) - { - if( buf.empty() ) - { - std::stack().swap(states); - return std::make_pair(position, Error); - } - else - { - state = IntegerLF; - } - } - else if( isdigit(c) || c == '-' ) - { - buf.push_back(c); - } - else - { - std::stack().swap(states); - return std::make_pair(position, Error); - } - break; - case IntegerLF: - if( c == '\n' ) - { - int64_t value = bufToLong(buf.data(), buf.size()); - - buf.clear(); - redisValue = RedisValue(value); - state = Start; - } - else - { - std::stack().swap(states); - return std::make_pair(position, Error); - } - break; - default: - std::stack().swap(states); - return std::make_pair(position, Error); - } - - - if (state == Start) - { - if (!arraySizes.empty()) - { - assert(arraySizes.size() > 0); - arrayValues.top().getArray().push_back(redisValue); - - while(!arraySizes.empty() && --arraySizes.top() == 0) - { - arraySizes.pop(); - redisValue = std::move(arrayValues.top()); - arrayValues.pop(); - - if (!arraySizes.empty()) - arrayValues.top().getArray().push_back(redisValue); - } - } - - - if (arraySizes.empty()) - { - // done - break; - } - } - } - - if (arraySizes.empty() && state == Start) - { - return std::make_pair(position, Completed); - } - else - { - states.push(state); - return std::make_pair(position, Incompleted); - } -} - -RedisValue RedisParser::result() -{ - return std::move(redisValue); -} - -/* - * Convert string to long. I can't use atol/strtol because it - * work only with null terminated string. I can use temporary - * std::string object but that is slower then bufToLong. - */ -long int RedisParser::bufToLong(const char *str, size_t size) -{ - long int value = 0; - bool sign = false; - - if( str == nullptr || size == 0 ) - { - return 0; - } - - if( *str == '-' ) - { - sign = true; - ++str; - --size; - - if( size == 0 ) { - return 0; - } - } - - for(const char *end = str + size; str != end; ++str) - { - char c = *str; - - // char must be valid, already checked in the parser - assert(c >= '0' && c <= '9'); - - value = value * 10; - value += c - '0'; - } - - return sign ? -value : value; -} - -} - -#endif // REDISCLIENT_REDISPARSER_CPP diff --git a/src/redisclient/impl/redissyncclient.cpp b/src/redisclient/impl/redissyncclient.cpp deleted file mode 100644 index 3583c3e..0000000 --- a/src/redisclient/impl/redissyncclient.cpp +++ /dev/null @@ -1,321 +0,0 @@ -/* - * Copyright (C) Alex Nekipelov (alex@nekipelov.net) - * License: MIT - */ - -#ifndef REDISCLIENT_REDISSYNCCLIENT_CPP -#define REDISCLIENT_REDISSYNCCLIENT_CPP - -#include -#include - -#include "redisclient/redissyncclient.h" -#include "redisclient/pipeline.h" -#include "redisclient/impl/throwerror.h" - -namespace redisclient { - -RedisSyncClient::RedisSyncClient(boost::asio::io_service &ioService) - : pimpl(std::make_shared(ioService)), - connectTimeout(boost::posix_time::hours(365 * 24)), - commandTimeout(boost::posix_time::hours(365 * 24)), - tcpNoDelay(true), tcpKeepAlive(false) -{ - pimpl->errorHandler = std::bind(&RedisClientImpl::defaulErrorHandler, std::placeholders::_1); -} - -RedisSyncClient::RedisSyncClient(RedisSyncClient &&other) - : pimpl(std::move(other.pimpl)), - connectTimeout(std::move(other.connectTimeout)), - commandTimeout(std::move(other.commandTimeout)), - tcpNoDelay(std::move(other.tcpNoDelay)), - tcpKeepAlive(std::move(other.tcpKeepAlive)) -{ -} - - -RedisSyncClient::~RedisSyncClient() -{ - if (pimpl) - pimpl->close(); -} - -void RedisSyncClient::connect(const boost::asio::ip::tcp::endpoint &endpoint) -{ - boost::system::error_code ec; - - connect(endpoint, ec); - detail::throwIfError(ec); -} - -void RedisSyncClient::connect(const boost::asio::ip::tcp::endpoint &endpoint, - boost::system::error_code &ec) -{ - pimpl->socket.open(endpoint.protocol(), ec); - - if (!ec && tcpNoDelay) - pimpl->socket.set_option(boost::asio::ip::tcp::no_delay(true), ec); - - // TODO keep alive option - - // boost asio does not support `connect` with timeout - int socket = pimpl->socket.native_handle(); - struct sockaddr_in addr; - - addr.sin_family = AF_INET; - addr.sin_port = htons(endpoint.port()); - addr.sin_addr.s_addr = inet_addr(endpoint.address().to_string().c_str()); - - // Set non-blocking - int arg = 0; - if ((arg = fcntl(socket, F_GETFL, NULL)) < 0) - { - ec = boost::system::error_code(errno, - boost::asio::error::get_system_category()); - return; - } - - arg |= O_NONBLOCK; - - if (fcntl(socket, F_SETFL, arg) < 0) - { - ec = boost::system::error_code(errno, - boost::asio::error::get_system_category()); - return; - } - - // connecting - int result = ::connect(socket, reinterpret_cast(&addr), sizeof(addr)); - if (result < 0) - { - if (errno == EINPROGRESS) - { - for(;;) - { - //selecting - pollfd pfd; - pfd.fd = socket; - pfd.events = POLLOUT; - - result = ::poll(&pfd, 1, connectTimeout.total_milliseconds()); - - if (result < 0) - { - if (errno == EINTR) - { - // try again - continue; - } - else - { - ec = boost::system::error_code(errno, - boost::asio::error::get_system_category()); - return; - } - } - else if (result > 0) - { - // check for error - int valopt; - socklen_t optlen = sizeof(valopt); - - - if (getsockopt(socket, SOL_SOCKET, SO_ERROR, - reinterpret_cast(&valopt), &optlen ) < 0) - { - ec = boost::system::error_code(errno, - boost::asio::error::get_system_category()); - return; - } - - if (valopt) - { - ec = boost::system::error_code(valopt, - boost::asio::error::get_system_category()); - return; - } - - break; - } - else - { - // timeout - ec = boost::system::error_code(ETIMEDOUT, - boost::asio::error::get_system_category()); - return; - } - } - } - else - { - ec = boost::system::error_code(errno, - boost::asio::error::get_system_category()); - return; - } - } - - if ((arg = fcntl(socket, F_GETFL, NULL)) < 0) - { - ec = boost::system::error_code(errno, - boost::asio::error::get_system_category()); - return; - } - - arg &= (~O_NONBLOCK); - - if (fcntl(socket, F_SETFL, arg) < 0) - { - ec = boost::system::error_code(errno, - boost::asio::error::get_system_category()); - } - - if (!ec) - pimpl->state = State::Connected; -} - -#ifdef BOOST_ASIO_HAS_LOCAL_SOCKETS - -void RedisSyncClient::connect(const boost::asio::local::stream_protocol::endpoint &endpoint) -{ - boost::system::error_code ec; - - connect(endpoint, ec); - detail::throwIfError(ec); -} - -void RedisSyncClient::connect(const boost::asio::local::stream_protocol::endpoint &endpoint, - boost::system::error_code &ec) -{ - pimpl->socket.open(endpoint.protocol(), ec); - - if (!ec) - pimpl->socket.connect(endpoint, ec); - - if (!ec) - pimpl->state = State::Connected; -} - -#endif - -bool RedisSyncClient::isConnected() const -{ - return pimpl->getState() == State::Connected || - pimpl->getState() == State::Subscribed; -} - -void RedisSyncClient::disconnect() -{ - pimpl->close(); -} - -void RedisSyncClient::installErrorHandler( - std::function handler) -{ - pimpl->errorHandler = std::move(handler); -} - -RedisValue RedisSyncClient::command(std::string cmd, std::deque args) -{ - boost::system::error_code ec; - RedisValue result = command(std::move(cmd), std::move(args), ec); - - detail::throwIfError(ec); - return result; -} - -RedisValue RedisSyncClient::command(std::string cmd, std::deque args, - boost::system::error_code &ec) -{ - if(stateValid()) - { - args.push_front(std::move(cmd)); - - return pimpl->doSyncCommand(args, commandTimeout, ec); - } - else - { - return RedisValue(); - } -} - -Pipeline RedisSyncClient::pipelined() -{ - Pipeline pipe(*this); - return pipe; -} - -RedisValue RedisSyncClient::pipelined(std::deque> commands) -{ - boost::system::error_code ec; - RedisValue result = pipelined(std::move(commands), ec); - - detail::throwIfError(ec); - return result; -} - -RedisValue RedisSyncClient::pipelined(std::deque> commands, - boost::system::error_code &ec) -{ - if(stateValid()) - { - return pimpl->doSyncCommand(commands, commandTimeout, ec); - } - else - { - return RedisValue(); - } -} - -RedisSyncClient::State RedisSyncClient::state() const -{ - return pimpl->getState(); -} - -bool RedisSyncClient::stateValid() const -{ - assert( state() == State::Connected ); - - if( state() != State::Connected ) - { - std::stringstream ss; - - ss << "RedisClient::command called with invalid state " - << to_string(state()); - - pimpl->errorHandler(ss.str()); - return false; - } - - return true; -} - -RedisSyncClient &RedisSyncClient::setConnectTimeout( - const boost::posix_time::time_duration &timeout) -{ - connectTimeout = timeout; - return *this; -} - - -RedisSyncClient &RedisSyncClient::setCommandTimeout( - const boost::posix_time::time_duration &timeout) -{ - commandTimeout = timeout; - return *this; -} - -RedisSyncClient &RedisSyncClient::setTcpNoDelay(bool enable) -{ - tcpNoDelay = enable; - return *this; -} - -RedisSyncClient &RedisSyncClient::setTcpKeepAlive(bool enable) -{ - tcpKeepAlive = enable; - return *this; -} - -} - -#endif // REDISCLIENT_REDISSYNCCLIENT_CPP diff --git a/src/redisclient/impl/redisvalue.cpp b/src/redisclient/impl/redisvalue.cpp deleted file mode 100644 index 451f9f8..0000000 --- a/src/redisclient/impl/redisvalue.cpp +++ /dev/null @@ -1,198 +0,0 @@ -/* - * Copyright (C) Alex Nekipelov (alex@nekipelov.net) - * License: MIT - */ - -#ifndef REDISCLIENT_REDISVALUE_CPP -#define REDISCLIENT_REDISVALUE_CPP - -#include - -#include "redisclient/redisvalue.h" - -namespace redisclient { - -RedisValue::RedisValue() - : value(NullTag()), error(false) -{ -} - -RedisValue::RedisValue(RedisValue &&other) - : value(std::move(other.value)), error(other.error) -{ -} - -RedisValue::RedisValue(int64_t i) - : value(i), error(false) -{ -} - -RedisValue::RedisValue(const char *s) - : value( std::vector(s, s + strlen(s)) ), error(false) -{ -} - -RedisValue::RedisValue(const std::string &s) - : value( std::vector(s.begin(), s.end()) ), error(false) -{ -} - -RedisValue::RedisValue(std::vector buf) - : value(std::move(buf)), error(false) -{ -} - -RedisValue::RedisValue(std::vector buf, struct ErrorTag) - : value(std::move(buf)), error(true) -{ -} - -RedisValue::RedisValue(std::vector array) - : value(std::move(array)), error(false) -{ -} - -std::vector RedisValue::toArray() const -{ - return castTo< std::vector >(); -} - -std::string RedisValue::toString() const -{ - const std::vector &buf = toByteArray(); - return std::string(buf.begin(), buf.end()); -} - -std::vector RedisValue::toByteArray() const -{ - return castTo >(); -} - -int64_t RedisValue::toInt() const -{ - return castTo(); -} - -std::string RedisValue::inspect() const -{ - if( isError() ) - { - static std::string err = "error: "; - std::string result; - - result = err; - result += toString(); - - return result; - } - else if( isNull() ) - { - static std::string null = "(null)"; - return null; - } - else if( isInt() ) - { - return std::to_string(toInt()); - } - else if( isString() ) - { - return toString(); - } - else - { - std::vector values = toArray(); - std::string result = "["; - - if( values.empty() == false ) - { - for(size_t i = 0; i < values.size(); ++i) - { - result += values[i].inspect(); - result += ", "; - } - - result.resize(result.size() - 1); - result[result.size() - 1] = ']'; - } - else - { - result += ']'; - } - - return result; - } -} - -bool RedisValue::isOk() const -{ - return !isError(); -} - -bool RedisValue::isError() const -{ - return error; -} - -bool RedisValue::isNull() const -{ - return typeEq(); -} - -bool RedisValue::isInt() const -{ - return typeEq(); -} - -bool RedisValue::isString() const -{ - return typeEq >(); -} - -bool RedisValue::isByteArray() const -{ - return typeEq >(); -} - -bool RedisValue::isArray() const -{ - return typeEq< std::vector >(); -} - -std::vector &RedisValue::getByteArray() -{ - assert(isByteArray()); - return boost::get>(value); -} - -const std::vector &RedisValue::getByteArray() const -{ - assert(isByteArray()); - return boost::get>(value); -} - -std::vector &RedisValue::getArray() -{ - assert(isArray()); - return boost::get>(value); -} - -const std::vector &RedisValue::getArray() const -{ - assert(isArray()); - return boost::get>(value); -} - -bool RedisValue::operator == (const RedisValue &rhs) const -{ - return value == rhs.value; -} - -bool RedisValue::operator != (const RedisValue &rhs) const -{ - return !(value == rhs.value); -} - -} - -#endif // REDISCLIENT_REDISVALUE_CPP - diff --git a/src/redisclient/impl/throwerror.h b/src/redisclient/impl/throwerror.h deleted file mode 100644 index d9ad3c7..0000000 --- a/src/redisclient/impl/throwerror.h +++ /dev/null @@ -1,28 +0,0 @@ -#pragma once - -#include -#include - -namespace redisclient -{ - -namespace detail -{ - -inline void throwError(const boost::system::error_code &ec) -{ - boost::system::system_error error(ec); - throw error; -} - -inline void throwIfError(const boost::system::error_code &ec) -{ - if (ec) - { - throwError(ec); - } -} - -} - -} diff --git a/src/redisclient/pipeline.h b/src/redisclient/pipeline.h deleted file mode 100644 index 4cc2791..0000000 --- a/src/redisclient/pipeline.h +++ /dev/null @@ -1,58 +0,0 @@ -/* - * Copyright (C) Alex Nekipelov (alex@nekipelov.net) - * License: MIT - */ - -#pragma once - -#include -#include - -#include "redisbuffer.h" -#include "config.h" - -namespace redisclient -{ - -class RedisSyncClient; -class RedisValue; - -// See https://redis.io/topics/pipelining. -class Pipeline -{ -public: - REDIS_CLIENT_DECL Pipeline(RedisSyncClient &client); - - // add command to pipe - REDIS_CLIENT_DECL Pipeline &command(std::string cmd, std::deque args); - - // Sends all commands to the redis server. - // For every request command will get response value. - // Example: - // - // Pipeline pipe(redis); - // - // pipe.command("GET", {"foo"}) - // .command("GET", {"bar"}) - // .command("GET", {"more"}); - // - // std::vector result = pipe.finish().toArray(); - // - // result[0]; // value of the key "foo" - // result[1]; // value of the key "bar" - // result[2]; // value of the key "more" - // - REDIS_CLIENT_DECL RedisValue finish(); - REDIS_CLIENT_DECL RedisValue finish(boost::system::error_code &ec); - -private: - std::deque> commands; - RedisSyncClient &client; -}; - -} - -#ifdef REDIS_CLIENT_HEADER_ONLY -#include "redisclient/impl/pipeline.cpp" -#endif - diff --git a/src/redisclient/redisasyncclient.h b/src/redisclient/redisasyncclient.h deleted file mode 100644 index 7cf6f6a..0000000 --- a/src/redisclient/redisasyncclient.h +++ /dev/null @@ -1,117 +0,0 @@ -/* - * Copyright (C) Alex Nekipelov (alex@nekipelov.net) - * License: MIT - */ - -#ifndef REDISASYNCCLIENT_REDISCLIENT_H -#define REDISASYNCCLIENT_REDISCLIENT_H - -#include -#include - -#include -#include -#include -#include - -#include "redisclient/impl/redisclientimpl.h" -#include "redisvalue.h" -#include "redisbuffer.h" -#include "config.h" - -namespace redisclient { - -class RedisClientImpl; - -class RedisAsyncClient : boost::noncopyable { -public: - // Subscribe handle. - struct Handle { - size_t id; - std::string channel; - }; - - typedef RedisClientImpl::State State; - - REDIS_CLIENT_DECL RedisAsyncClient(boost::asio::io_service &ioService); - REDIS_CLIENT_DECL ~RedisAsyncClient(); - - // Connect to redis server - REDIS_CLIENT_DECL void connect( - const boost::asio::ip::tcp::endpoint &endpoint, - std::function handler); - -#ifdef BOOST_ASIO_HAS_LOCAL_SOCKETS - REDIS_CLIENT_DECL void connect( - const boost::asio::local::stream_protocol::endpoint &endpoint, - std::function handler); -#endif - - // Return true if is connected to redis. - REDIS_CLIENT_DECL bool isConnected() const; - - // Return connection state. See RedisClientImpl::State. - REDIS_CLIENT_DECL State state() const; - - // disconnect from redis and clear command queue - REDIS_CLIENT_DECL void disconnect(); - - // Set custom error handler. - REDIS_CLIENT_DECL void installErrorHandler( - std::function handler); - - // Execute command on Redis server with the list of arguments. - REDIS_CLIENT_DECL void command( - const std::string &cmd, std::deque args, - std::function handler = dummyHandler); - - // Subscribe to channel. Handler msgHandler will be called - // when someone publish message on channel. Call unsubscribe - // to stop the subscription. - REDIS_CLIENT_DECL Handle subscribe(const std::string &channelName, - std::function msg)> msgHandler, - std::function handler = &dummyHandler); - - - REDIS_CLIENT_DECL Handle psubscribe(const std::string &pattern, - std::function msg)> msgHandler, - std::function handler = &dummyHandler); - - // Unsubscribe - REDIS_CLIENT_DECL void unsubscribe(const Handle &handle); - REDIS_CLIENT_DECL void punsubscribe(const Handle &handle); - - // Subscribe to channel. Handler msgHandler will be called - // when someone publish message on channel; it will be - // unsubscribed after call. - REDIS_CLIENT_DECL void singleShotSubscribe( - const std::string &channel, - std::function msg)> msgHandler, - std::function handler = &dummyHandler); - - REDIS_CLIENT_DECL void singleShotPSubscribe( - const std::string &channel, - std::function msg)> msgHandler, - std::function handler = &dummyHandler); - - // Publish message on channel. - REDIS_CLIENT_DECL void publish( - const std::string &channel, const RedisBuffer &msg, - std::function handler = &dummyHandler); - - REDIS_CLIENT_DECL static void dummyHandler(RedisValue) {} - -protected: - REDIS_CLIENT_DECL bool stateValid() const; - -private: - std::shared_ptr pimpl; -}; - -} - -#ifdef REDIS_CLIENT_HEADER_ONLY -#include "redisclient/impl/redisasyncclient.cpp" -#endif - -#endif // REDISASYNCCLIENT_REDISCLIENT_H diff --git a/src/redisclient/redisbuffer.h b/src/redisclient/redisbuffer.h deleted file mode 100644 index a520923..0000000 --- a/src/redisclient/redisbuffer.h +++ /dev/null @@ -1,64 +0,0 @@ -/* - * Copyright (C) Alex Nekipelov (alex@nekipelov.net) - * License: MIT - */ - - -#ifndef REDISSYNCCLIENT_REDISBUFFER_H -#define REDISSYNCCLIENT_REDISBUFFER_H - -#include - -#include -#include - -#include "config.h" - -namespace redisclient { - -struct RedisBuffer -{ - RedisBuffer() = default; - inline RedisBuffer(const char *ptr, size_t dataSize); - inline RedisBuffer(const char *s); - inline RedisBuffer(std::string s); - inline RedisBuffer(std::vector buf); - - inline size_t size() const; - - boost::variant> data; -}; - - -RedisBuffer::RedisBuffer(const char *ptr, size_t dataSize) - : data(std::vector(ptr, ptr + dataSize)) -{ -} - -RedisBuffer::RedisBuffer(const char *s) - : data(std::string(s)) -{ -} - -RedisBuffer::RedisBuffer(std::string s) - : data(std::move(s)) -{ -} - -RedisBuffer::RedisBuffer(std::vector buf) - : data(std::move(buf)) -{ -} - -size_t RedisBuffer::size() const -{ - if (data.type() == typeid(std::string)) - return boost::get(data).size(); - else - return boost::get>(data).size(); -} - -} - -#endif //REDISSYNCCLIENT_REDISBUFFER_H - diff --git a/src/redisclient/redisparser.h b/src/redisclient/redisparser.h deleted file mode 100644 index 40841fa..0000000 --- a/src/redisclient/redisparser.h +++ /dev/null @@ -1,95 +0,0 @@ -/* - * Copyright (C) Alex Nekipelov (alex@nekipelov.net) - * License: MIT - */ - -#ifndef REDISCLIENT_REDISPARSER_H -#define REDISCLIENT_REDISPARSER_H - -#include -#include -#include - -#include "redisvalue.h" -#include "config.h" - -namespace redisclient { - -class RedisParser -{ -public: - REDIS_CLIENT_DECL RedisParser(); - - enum ParseResult { - Completed, - Incompleted, - Error, - }; - - REDIS_CLIENT_DECL std::pair parse(const char *ptr, size_t size); - - REDIS_CLIENT_DECL RedisValue result(); - -protected: - REDIS_CLIENT_DECL std::pair parseChunk(const char *ptr, size_t size); - - inline bool isChar(int c) - { - return c >= 0 && c <= 127; - } - - inline bool isControl(int c) - { - return (c >= 0 && c <= 31) || (c == 127); - } - - REDIS_CLIENT_DECL long int bufToLong(const char *str, size_t size); - -private: - enum State { - Start = 0, - StartArray = 1, - - String = 2, - StringLF = 3, - - ErrorString = 4, - ErrorLF = 5, - - Integer = 6, - IntegerLF = 7, - - BulkSize = 8, - BulkSizeLF = 9, - Bulk = 10, - BulkCR = 11, - BulkLF = 12, - - ArraySize = 13, - ArraySizeLF = 14, - }; - - std::stack states; - - long int bulkSize; - std::vector buf; - RedisValue redisValue; - - // temporary variables - std::stack arraySizes; - std::stack arrayValues; - - static const char stringReply = '+'; - static const char errorReply = '-'; - static const char integerReply = ':'; - static const char bulkReply = '$'; - static const char arrayReply = '*'; -}; - -} - -#ifdef REDIS_CLIENT_HEADER_ONLY -#include "redisclient/impl/redisparser.cpp" -#endif - -#endif // REDISCLIENT_REDISPARSER_H diff --git a/src/redisclient/redissyncclient.h b/src/redisclient/redissyncclient.h deleted file mode 100644 index 07f3952..0000000 --- a/src/redisclient/redissyncclient.h +++ /dev/null @@ -1,109 +0,0 @@ -/* - * Copyright (C) Alex Nekipelov (alex@nekipelov.net) - * License: MIT - */ - -#ifndef REDISSYNCCLIENT_REDISCLIENT_H -#define REDISSYNCCLIENT_REDISCLIENT_H - -#include -#include - -#include -#include -#include - -#include "redisclient/impl/redisclientimpl.h" -#include "redisbuffer.h" -#include "redisvalue.h" -#include "config.h" - -namespace redisclient { - -class RedisClientImpl; -class Pipeline; - -class RedisSyncClient : boost::noncopyable { -public: - typedef RedisClientImpl::State State; - - REDIS_CLIENT_DECL RedisSyncClient(boost::asio::io_service &ioService); - REDIS_CLIENT_DECL RedisSyncClient(RedisSyncClient &&other); - REDIS_CLIENT_DECL ~RedisSyncClient(); - - // Connect to redis server - REDIS_CLIENT_DECL void connect( - const boost::asio::ip::tcp::endpoint &endpoint, - boost::system::error_code &ec); - - // Connect to redis server - REDIS_CLIENT_DECL void connect( - const boost::asio::ip::tcp::endpoint &endpoint); - -#ifdef BOOST_ASIO_HAS_LOCAL_SOCKETS - REDIS_CLIENT_DECL void connect( - const boost::asio::local::stream_protocol::endpoint &endpoint, - boost::system::error_code &ec); - - REDIS_CLIENT_DECL void connect( - const boost::asio::local::stream_protocol::endpoint &endpoint); -#endif - - // Return true if is connected to redis. - REDIS_CLIENT_DECL bool isConnected() const; - - // disconnect from redis - REDIS_CLIENT_DECL void disconnect(); - - // Set custom error handler. - REDIS_CLIENT_DECL void installErrorHandler( - std::function handler); - - // Execute command on Redis server with the list of arguments. - REDIS_CLIENT_DECL RedisValue command( - std::string cmd, std::deque args); - - // Execute command on Redis server with the list of arguments. - REDIS_CLIENT_DECL RedisValue command( - std::string cmd, std::deque args, - boost::system::error_code &ec); - - // Create pipeline (see Pipeline) - REDIS_CLIENT_DECL Pipeline pipelined(); - - REDIS_CLIENT_DECL RedisValue pipelined( - std::deque> commands, - boost::system::error_code &ec); - - REDIS_CLIENT_DECL RedisValue pipelined( - std::deque> commands); - - // Return connection state. See RedisClientImpl::State. - REDIS_CLIENT_DECL State state() const; - - REDIS_CLIENT_DECL RedisSyncClient &setConnectTimeout( - const boost::posix_time::time_duration &timeout); - REDIS_CLIENT_DECL RedisSyncClient &setCommandTimeout( - const boost::posix_time::time_duration &timeout); - - REDIS_CLIENT_DECL RedisSyncClient &setTcpNoDelay(bool enable); - REDIS_CLIENT_DECL RedisSyncClient &setTcpKeepAlive(bool enable); - -protected: - REDIS_CLIENT_DECL bool stateValid() const; - -private: - std::shared_ptr pimpl; - boost::posix_time::time_duration connectTimeout; - boost::posix_time::time_duration commandTimeout; - bool tcpNoDelay; - bool tcpKeepAlive; -}; - -} - -#ifdef REDIS_CLIENT_HEADER_ONLY -#include "redisclient/impl/redissyncclient.cpp" -#endif - -#endif // REDISSYNCCLIENT_REDISCLIENT_H diff --git a/src/redisclient/redisvalue.h b/src/redisclient/redisvalue.h deleted file mode 100644 index fcca587..0000000 --- a/src/redisclient/redisvalue.h +++ /dev/null @@ -1,126 +0,0 @@ -/* - * Copyright (C) Alex Nekipelov (alex@nekipelov.net) - * License: MIT - */ - -#ifndef REDISCLIENT_REDISVALUE_H -#define REDISCLIENT_REDISVALUE_H - -#include -#include -#include - -#include "config.h" - -namespace redisclient { - -class RedisValue { -public: - struct ErrorTag {}; - - REDIS_CLIENT_DECL RedisValue(); - REDIS_CLIENT_DECL RedisValue(RedisValue &&other); - REDIS_CLIENT_DECL RedisValue(int64_t i); - REDIS_CLIENT_DECL RedisValue(const char *s); - REDIS_CLIENT_DECL RedisValue(const std::string &s); - REDIS_CLIENT_DECL RedisValue(std::vector buf); - REDIS_CLIENT_DECL RedisValue(std::vector buf, struct ErrorTag); - REDIS_CLIENT_DECL RedisValue(std::vector array); - - - RedisValue(const RedisValue &) = default; - RedisValue& operator = (const RedisValue &) = default; - RedisValue& operator = (RedisValue &&) = default; - - // Return the value as a std::string if - // type is a byte string; otherwise returns an empty std::string. - REDIS_CLIENT_DECL std::string toString() const; - - // Return the value as a std::vector if - // type is a byte string; otherwise returns an empty std::vector. - REDIS_CLIENT_DECL std::vector toByteArray() const; - - // Return the value as a std::vector if - // type is an int; otherwise returns 0. - REDIS_CLIENT_DECL int64_t toInt() const; - - // Return the value as an array if type is an array; - // otherwise returns an empty array. - REDIS_CLIENT_DECL std::vector toArray() const; - - // Return the string representation of the value. Use - // for dump content of the value. - REDIS_CLIENT_DECL std::string inspect() const; - - // Return true if value not a error - REDIS_CLIENT_DECL bool isOk() const; - // Return true if value is a error - REDIS_CLIENT_DECL bool isError() const; - - // Return true if this is a null. - REDIS_CLIENT_DECL bool isNull() const; - // Return true if type is an int - REDIS_CLIENT_DECL bool isInt() const; - // Return true if type is an array - REDIS_CLIENT_DECL bool isArray() const; - // Return true if type is a string/byte array. Alias for isString(); - REDIS_CLIENT_DECL bool isByteArray() const; - // Return true if type is a string/byte array. Alias for isByteArray(). - REDIS_CLIENT_DECL bool isString() const; - - // Methods for increasing perfomance - // Throws: boost::bad_get if the type does not match - REDIS_CLIENT_DECL std::vector &getByteArray(); - REDIS_CLIENT_DECL const std::vector &getByteArray() const; - REDIS_CLIENT_DECL std::vector &getArray(); - REDIS_CLIENT_DECL const std::vector &getArray() const; - - - REDIS_CLIENT_DECL bool operator == (const RedisValue &rhs) const; - REDIS_CLIENT_DECL bool operator != (const RedisValue &rhs) const; - -protected: - template - T castTo() const; - - template - bool typeEq() const; - -private: - struct NullTag { - inline bool operator == (const NullTag &) const { - return true; - } - }; - - - boost::variant, std::vector > value; - bool error; -}; - - -template -T RedisValue::castTo() const -{ - if( value.type() == typeid(T) ) - return boost::get(value); - else - return T(); -} - -template -bool RedisValue::typeEq() const -{ - if( value.type() == typeid(T) ) - return true; - else - return false; -} - -} - -#ifdef REDIS_CLIENT_HEADER_ONLY -#include "redisclient/impl/redisvalue.cpp" -#endif - -#endif // REDISCLIENT_REDISVALUE_H diff --git a/src/redisclient/version.h b/src/redisclient/version.h deleted file mode 100644 index 27a234d..0000000 --- a/src/redisclient/version.h +++ /dev/null @@ -1,11 +0,0 @@ -/* - * Copyright (C) Alex Nekipelov (alex@nekipelov.net) - * License: MIT - */ - -#ifndef REDISCLIENT_VERSION_H -#define REDISCLIENT_VERSION_H - -#define REDIS_CLIENT_VERSION 10002 // 1.0.2 - -#endif // REDISCLIENT_VERSION_H diff --git a/src/sync_client.cpp b/src/sync_client.cpp new file mode 100644 index 0000000..c79cc6f --- /dev/null +++ b/src/sync_client.cpp @@ -0,0 +1,87 @@ +#ifndef REDIS_SYNC_CLIENT_SOURCE +#define REDIS_SYNC_CLIENT_SOURCE + +#include +#include + +#include "redis/sync_client.hpp" + +namespace redis { + sync_client::sync_client(asio::io_context &ioService) : pimpl(std::make_shared(ioService)) { + pimpl->errorsubscribe_handler = std::bind(&base_client::defaulErrorsubscribe_handler, std::placeholders::_1); + } + + sync_client::~sync_client() { + pimpl->close(); + } + + bool sync_client::connect(const asio::ip::tcp::endpoint &endpoint, std::string &errmsg) { + asio::error_code ec; + pimpl->socket.open(endpoint.protocol(), ec); + + if(!ec) { + pimpl->socket.set_option(asio::ip::tcp::no_delay(true), ec); + + if(!ec) { + pimpl->socket.connect(endpoint, ec); + } + } + + if(!ec) { + pimpl->state = State::Connected; + return true; + } else { + errmsg = ec.message(); + return false; + } + } + + bool sync_client::connect(const asio::ip::address &address, unsigned short port, std::string &errmsg) { + asio::ip::tcp::endpoint endpoint(address, port); + return connect(endpoint, errmsg); + } + + void sync_client::installErrorsubscribe_handler(std::function handler) { + pimpl->errorsubscribe_handler = std::move(handler); + } + + value sync_client::command(const std::string &cmd, std::deque args) { + if(stateValid()) { + args.emplace_front(cmd); + + return pimpl->doSyncCommand(args); + } else { + return value(); + } + } + + value sync_client::command(const std::string &cmd) { + if (stateValid()) { + std::deque args; + args.emplace_front(cmd); + return pimpl->doSyncCommand(args); + } else { + return value(); + } + } + + sync_client::State sync_client::state() const { + return pimpl->getState(); + } + + bool sync_client::stateValid() const { + assert( state() == State::Connected ); + + if(state() != State::Connected) { + std::stringstream ss; + ss << "RedisClient::command called with invalid state " << to_string(state()); + + pimpl->errorsubscribe_handler(ss.str()); + return false; + } + + return true; + } +} + +#endif diff --git a/src/value.cpp b/src/value.cpp new file mode 100644 index 0000000..4debfd9 --- /dev/null +++ b/src/value.cpp @@ -0,0 +1,124 @@ +#ifndef REDIS_VALUE_SOURCE +#define REDIS_VALUE_SOURCE + +#include + +#include "redis/value.hpp" + +namespace redis { + value::value() : _value(NullTag()), _error(false) { + } + + value::value(value &&other) : _value(std::move(other._value)), _error(other._error) { + } + + value::value(int64_t i) : _value(i), _error(false) { + } + + value::value(const char *s) : _value( std::vector(s, s + strlen(s)) ), _error(false) { + } + + value::value(const std::string &s) : _value( std::vector(s.begin(), s.end()) ), _error(false) { + } + + value::value(std::vector buf) : _value(std::move(buf)), _error(false) { + } + + value::value(std::vector buf, struct ErrorTag) : _value(std::move(buf)), _error(true) { + } + + value::value(std::vector array) : _value(std::move(array)), _error(false) { + } + + std::vector value::toArray() const { + return castTo< std::vector >(); + } + + std::string value::toString() const { + const std::vector &buf = toByteArray(); + return std::string(buf.begin(), buf.end()); + } + + std::vector value::toByteArray() const { + return castTo >(); + } + + int64_t value::toInt() const { + return castTo(); + } + + std::string value::inspect() const { + if (isError()) { + static std::string err = "_error: "; + std::string result; + + result = err; + result += toString(); + + return result; + } else if(isNull()) { + static std::string null = "(null)"; + return null; + } else if(isInt()) { + return std::to_string(toInt()); + } else if(isString()) { + return toString(); + } else { + std::vector values = toArray(); + std::string result = "["; + + if(values.empty() == false) { + for(size_t i = 0; i < values.size(); ++i) { + result += values[i].inspect(); + result += ", "; + } + + result.resize(result.size() - 1); + result[result.size() - 1] = ']'; + } else { + result += ']'; + } + + return result; + } + } + + bool value::isOk() const { + return !isError(); + } + + bool value::isError() const { + return _error; + } + + bool value::isNull() const { + return typeEq(); + } + + bool value::isInt() const { + return typeEq(); + } + + bool value::isString() const { + return typeEq >(); + } + + bool value::isByteArray() const { + return typeEq >(); + } + + bool value::isArray() const { + return typeEq< std::vector >(); + } + + bool value::operator == (const value &rhs) const { + return _value == rhs._value; + } + + bool value::operator != (const value &rhs) const { + return !(_value == rhs._value); + } +} + +#endif +