diff --git a/src/blaze.cpp b/src/blaze.cpp index 575a0e4..246fa71 100644 --- a/src/blaze.cpp +++ b/src/blaze.cpp @@ -12,7 +12,7 @@ #include "../vendor/rapidjson/include/rapidjson/filewritestream.h" #include "../vendor/rapidjson/include/rapidjson/writer.h" -#define DEFAULT_SIZE 5000 +#define DEFAULT_SIZE 5000 #define DEFAULT_SLICES 5 #define WRITE_BUF_SIZE 65536 @@ -28,12 +28,12 @@ struct auth_options struct dump_options { - std::string host; - std::string index; + std::string host; + std::string index; auth_options auth; - int slice_id; - int slice_max; - int size; + int slice_id; + int slice_max; + int size; }; struct thread_state @@ -43,41 +43,45 @@ struct thread_state struct thread_container { - int slice_id; + int slice_id; thread_state state; - std::thread thread; + std::thread thread; }; size_t write_data( - void * buffer, - size_t size, - size_t nmemb, - void * userp) + void *buffer, + size_t size, + size_t nmemb, + void *userp) { - std::vector* data = reinterpret_cast*>(userp); + std::vector *data = reinterpret_cast *>(userp); - const char* real_buffer = reinterpret_cast(buffer); + const char *real_buffer = reinterpret_cast(buffer); size_t real_size = size * nmemb; data->insert(data->end(), real_buffer, real_buffer + real_size); return real_size; } bool get_or_post_data( - CURL * crl, - std::string const & url, - auth_options const & auth, - std::vector * data, - long * response_code, - std::string * error, - std::string body = "") + CURL *crl, + std::string const &url, + auth_options const &auth, + std::vector *data, + long *response_code, + std::string *error, + std::string body = "") { - curl_slist* headers = nullptr; + curl_slist *headers = nullptr; headers = curl_slist_append(headers, "Content-Type: application/json"); - curl_easy_setopt(crl, CURLOPT_HTTPHEADER, headers); - curl_easy_setopt(crl, CURLOPT_URL, url.c_str()); + // enable compression, whether it's supported or not is at the discretion of the server + // curl should handle this transparently. + headers = curl_slist_append(headers, "Accept-Encoding: deflate, compress, gzip"); + + curl_easy_setopt(crl, CURLOPT_HTTPHEADER, headers); + curl_easy_setopt(crl, CURLOPT_URL, url.c_str()); curl_easy_setopt(crl, CURLOPT_WRITEFUNCTION, &write_data); - curl_easy_setopt(crl, CURLOPT_WRITEDATA, reinterpret_cast(data)); + curl_easy_setopt(crl, CURLOPT_WRITEDATA, reinterpret_cast(data)); if (auth.insecure) { @@ -89,7 +93,7 @@ bool get_or_post_data( { std::string user_pass = auth.user + ":" + auth.pass; curl_easy_setopt(crl, CURLOPT_HTTPAUTH, CURLAUTH_BASIC); - curl_easy_setopt(crl, CURLOPT_USERPWD, user_pass.c_str()); + curl_easy_setopt(crl, CURLOPT_USERPWD, user_pass.c_str()); } if (!body.empty()) @@ -111,37 +115,37 @@ bool get_or_post_data( } void write_document( - rapidjson::Document & document, - int * hits_count, - std::string * scroll_id) + rapidjson::Document &document, + int *hits_count, + std::string *scroll_id) { - std::unique_lock lock(mtx_out); + std::unique_lock lock(mtx_out); - static char buffer[WRITE_BUF_SIZE]; + static char buffer[WRITE_BUF_SIZE]; static rapidjson::FileWriteStream stream(stdout, buffer, sizeof(buffer)); // Epic const unfolding. - auto const& scroll_id_value = document["_scroll_id"]; - auto const& hits_object_value = document["hits"]; - auto const& hits_object = hits_object_value.GetObject(); - auto const& hits_value = hits_object["hits"]; - auto const& hits = hits_value.GetArray(); + auto const &scroll_id_value = document["_scroll_id"]; + auto const &hits_object_value = document["hits"]; + auto const &hits_object = hits_object_value.GetObject(); + auto const &hits_value = hits_object["hits"]; + auto const &hits = hits_value.GetArray(); // Shared allocator - auto& allocator = document.GetAllocator(); - auto writer = rapidjson::Writer(stream); + auto &allocator = document.GetAllocator(); + auto writer = rapidjson::Writer(stream); - for (rapidjson::Value const& hit : hits) + for (rapidjson::Value const &hit : hits) { - auto meta_index = rapidjson::Value(rapidjson::kObjectType); - auto meta_index_id = rapidjson::Value(); + auto meta_index = rapidjson::Value(rapidjson::kObjectType); + auto meta_index_id = rapidjson::Value(); auto meta_index_type = rapidjson::Value(); - auto meta_object = rapidjson::Value(rapidjson::kObjectType); + auto meta_object = rapidjson::Value(rapidjson::kObjectType); meta_index_id.SetString(hit["_id"].GetString(), allocator); meta_index_type.SetString(hit["_type"].GetString(), allocator); - meta_index.AddMember("_id", meta_index_id, allocator); + meta_index.AddMember("_id", meta_index_id, allocator); meta_index.AddMember("_type", meta_index_type, allocator); meta_object.AddMember("index", meta_index, allocator); @@ -160,13 +164,13 @@ void write_document( writer.Reset(stream); } - *scroll_id = scroll_id_value.GetString(); + *scroll_id = scroll_id_value.GetString(); *hits_count = hits.Size(); } void output_parser_error( - rapidjson::Document const& doc, - std::ostream & stream) + rapidjson::Document const &doc, + std::ostream &stream) { stream << "JSON parsing failed with code: " << doc.GetParseError() @@ -175,22 +179,25 @@ void output_parser_error( } void dump( - dump_options const& options, - thread_state * state) + dump_options const &options, + thread_state *state) { - CURL* crl = curl_easy_init(); + CURL *crl = curl_easy_init(); std::string query = "{\n" - "\"size\": " + std::to_string(options.size) + ",\n" - "\"slice\": {\n" - "\"id\": " + std::to_string(options.slice_id) + ",\n" - "\"max\": " + std::to_string(options.slice_max) + "\n" - "}\n" - "}"; + "\"size\": " + + std::to_string(options.size) + ",\n" + "\"slice\": {\n" + "\"id\": " + + std::to_string(options.slice_id) + ",\n" + "\"max\": " + + std::to_string(options.slice_max) + "\n" + "}\n" + "}"; std::vector buffer; - long response_code; - std::string error; + long response_code; + std::string error; bool res = get_or_post_data( crl, @@ -222,7 +229,7 @@ void dump( } std::string scroll_id; - int hits_count; + int hits_count; write_document( doc, @@ -232,9 +239,10 @@ void dump( do { query = "{\n" - "\"scroll\": \"1m\",\n" - "\"scroll_id\": \"" + scroll_id + "\"\n" - "}\n"; + "\"scroll\": \"1m\",\n" + "\"scroll_id\": \"" + + scroll_id + "\"\n" + "}\n"; buffer.clear(); @@ -277,16 +285,16 @@ void dump( } int64_t count_documents( - std::string const& host, - std::string const& index, - auth_options const& auth) + std::string const &host, + std::string const &index, + auth_options const &auth) { - CURL * crl = curl_easy_init(); - long response_code; - rapidjson::Document doc; - std::string url = host + "/" + index + "/_count"; - std::string error; - std::vector buffer; + CURL *crl = curl_easy_init(); + long response_code; + rapidjson::Document doc; + std::string url = host + "/" + index + "/_count"; + std::string error; + std::vector buffer; bool res = get_or_post_data( crl, @@ -314,19 +322,19 @@ int64_t count_documents( } int dump_mappings( - std::string const& host, - std::string const& index, - auth_options const& auth) + std::string const &host, + std::string const &index, + auth_options const &auth) { - static char write_buffer[WRITE_BUF_SIZE]; + static char write_buffer[WRITE_BUF_SIZE]; static rapidjson::FileWriteStream stream(stdout, write_buffer, sizeof(write_buffer)); - CURL * crl = curl_easy_init(); - long response_code; - rapidjson::Document doc; - std::string url = host + "/" + index + "/_mapping"; - std::string error; - std::vector buffer; + CURL *crl = curl_easy_init(); + long response_code; + rapidjson::Document doc; + std::string url = host + "/" + index + "/_mapping"; + std::string error; + std::vector buffer; bool res = get_or_post_data( crl, @@ -361,8 +369,8 @@ int dump_mappings( } int main( - int argc, - char * argv[]) + int argc, + char *argv[]) { curl_global_init(CURL_GLOBAL_ALL); @@ -431,23 +439,23 @@ int main( for (int i = 0; i < slices; i++) { dump_options opts; - opts.host = host; - opts.index = index; - opts.auth = auth; - opts.size = size; - opts.slice_id = i; + opts.host = host; + opts.index = index; + opts.auth = auth; + opts.size = size; + opts.slice_id = i; opts.slice_max = slices; - auto cnt = std::unique_ptr(new thread_container()); - cnt->slice_id = i; - cnt->thread = std::thread(dump, opts, &cnt->state); + auto cnt = std::unique_ptr(new thread_container()); + cnt->slice_id = i; + cnt->thread = std::thread(dump, opts, &cnt->state); threads.push_back(std::move(cnt)); } int exit_code = 0; - for (auto& cnt : threads) + for (auto &cnt : threads) { cnt->thread.join();