Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add http compression #9

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
190 changes: 99 additions & 91 deletions src/blaze.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
#include "../vendor/rapidjson/include/rapidjson/filewritestream.h"
#include "../vendor/rapidjson/include/rapidjson/writer.h"

#define DEFAULT_SIZE 5000
#define DEFAULT_SIZE 5000
#define DEFAULT_SLICES 5
#define WRITE_BUF_SIZE 65536

Expand All @@ -28,12 +28,12 @@ struct auth_options

struct dump_options
{
std::string host;
std::string index;
std::string host;
std::string index;
auth_options auth;
int slice_id;
int slice_max;
int size;
int slice_id;
int slice_max;
int size;
};

struct thread_state
Expand All @@ -43,41 +43,45 @@ struct thread_state

struct thread_container
{
int slice_id;
int slice_id;
thread_state state;
std::thread thread;
std::thread thread;
};

size_t write_data(
void * buffer,
size_t size,
size_t nmemb,
void * userp)
void *buffer,
size_t size,
size_t nmemb,
void *userp)
{
std::vector<char>* data = reinterpret_cast<std::vector<char>*>(userp);
std::vector<char> *data = reinterpret_cast<std::vector<char> *>(userp);

const char* real_buffer = reinterpret_cast<const char*>(buffer);
const char *real_buffer = reinterpret_cast<const char *>(buffer);
size_t real_size = size * nmemb;
data->insert(data->end(), real_buffer, real_buffer + real_size);
return real_size;
}

bool get_or_post_data(
CURL * crl,
std::string const & url,
auth_options const & auth,
std::vector<char> * data,
long * response_code,
std::string * error,
std::string body = "")
CURL *crl,
std::string const &url,
auth_options const &auth,
std::vector<char> *data,
long *response_code,
std::string *error,
std::string body = "")
{
curl_slist* headers = nullptr;
curl_slist *headers = nullptr;
headers = curl_slist_append(headers, "Content-Type: application/json");

curl_easy_setopt(crl, CURLOPT_HTTPHEADER, headers);
curl_easy_setopt(crl, CURLOPT_URL, url.c_str());
// enable compression, whether it's supported or not is at the discretion of the server
// curl should handle this transparently.
headers = curl_slist_append(headers, "Accept-Encoding: deflate, compress, gzip");

curl_easy_setopt(crl, CURLOPT_HTTPHEADER, headers);
curl_easy_setopt(crl, CURLOPT_URL, url.c_str());
curl_easy_setopt(crl, CURLOPT_WRITEFUNCTION, &write_data);
curl_easy_setopt(crl, CURLOPT_WRITEDATA, reinterpret_cast<void*>(data));
curl_easy_setopt(crl, CURLOPT_WRITEDATA, reinterpret_cast<void *>(data));

if (auth.insecure)
{
Expand All @@ -89,7 +93,7 @@ bool get_or_post_data(
{
std::string user_pass = auth.user + ":" + auth.pass;
curl_easy_setopt(crl, CURLOPT_HTTPAUTH, CURLAUTH_BASIC);
curl_easy_setopt(crl, CURLOPT_USERPWD, user_pass.c_str());
curl_easy_setopt(crl, CURLOPT_USERPWD, user_pass.c_str());
}

if (!body.empty())
Expand All @@ -111,37 +115,37 @@ bool get_or_post_data(
}

void write_document(
rapidjson::Document & document,
int * hits_count,
std::string * scroll_id)
rapidjson::Document &document,
int *hits_count,
std::string *scroll_id)
{
std::unique_lock<std::mutex> lock(mtx_out);
std::unique_lock<std::mutex> lock(mtx_out);

static char buffer[WRITE_BUF_SIZE];
static char buffer[WRITE_BUF_SIZE];
static rapidjson::FileWriteStream stream(stdout, buffer, sizeof(buffer));

// Epic const unfolding.
auto const& scroll_id_value = document["_scroll_id"];
auto const& hits_object_value = document["hits"];
auto const& hits_object = hits_object_value.GetObject();
auto const& hits_value = hits_object["hits"];
auto const& hits = hits_value.GetArray();
auto const &scroll_id_value = document["_scroll_id"];
auto const &hits_object_value = document["hits"];
auto const &hits_object = hits_object_value.GetObject();
auto const &hits_value = hits_object["hits"];
auto const &hits = hits_value.GetArray();

// Shared allocator
auto& allocator = document.GetAllocator();
auto writer = rapidjson::Writer<rapidjson::FileWriteStream>(stream);
auto &allocator = document.GetAllocator();
auto writer = rapidjson::Writer<rapidjson::FileWriteStream>(stream);

for (rapidjson::Value const& hit : hits)
for (rapidjson::Value const &hit : hits)
{
auto meta_index = rapidjson::Value(rapidjson::kObjectType);
auto meta_index_id = rapidjson::Value();
auto meta_index = rapidjson::Value(rapidjson::kObjectType);
auto meta_index_id = rapidjson::Value();
auto meta_index_type = rapidjson::Value();
auto meta_object = rapidjson::Value(rapidjson::kObjectType);
auto meta_object = rapidjson::Value(rapidjson::kObjectType);

meta_index_id.SetString(hit["_id"].GetString(), allocator);
meta_index_type.SetString(hit["_type"].GetString(), allocator);

meta_index.AddMember("_id", meta_index_id, allocator);
meta_index.AddMember("_id", meta_index_id, allocator);
meta_index.AddMember("_type", meta_index_type, allocator);

meta_object.AddMember("index", meta_index, allocator);
Expand All @@ -160,13 +164,13 @@ void write_document(
writer.Reset(stream);
}

*scroll_id = scroll_id_value.GetString();
*scroll_id = scroll_id_value.GetString();
*hits_count = hits.Size();
}

void output_parser_error(
rapidjson::Document const& doc,
std::ostream & stream)
rapidjson::Document const &doc,
std::ostream &stream)
{
stream << "JSON parsing failed with code: "
<< doc.GetParseError()
Expand All @@ -175,22 +179,25 @@ void output_parser_error(
}

void dump(
dump_options const& options,
thread_state * state)
dump_options const &options,
thread_state *state)
{
CURL* crl = curl_easy_init();
CURL *crl = curl_easy_init();

std::string query = "{\n"
"\"size\": " + std::to_string(options.size) + ",\n"
"\"slice\": {\n"
"\"id\": " + std::to_string(options.slice_id) + ",\n"
"\"max\": " + std::to_string(options.slice_max) + "\n"
"}\n"
"}";
"\"size\": " +
std::to_string(options.size) + ",\n"
"\"slice\": {\n"
"\"id\": " +
std::to_string(options.slice_id) + ",\n"
"\"max\": " +
std::to_string(options.slice_max) + "\n"
"}\n"
"}";

std::vector<char> buffer;
long response_code;
std::string error;
long response_code;
std::string error;

bool res = get_or_post_data(
crl,
Expand Down Expand Up @@ -222,7 +229,7 @@ void dump(
}

std::string scroll_id;
int hits_count;
int hits_count;

write_document(
doc,
Expand All @@ -232,9 +239,10 @@ void dump(
do
{
query = "{\n"
"\"scroll\": \"1m\",\n"
"\"scroll_id\": \"" + scroll_id + "\"\n"
"}\n";
"\"scroll\": \"1m\",\n"
"\"scroll_id\": \"" +
scroll_id + "\"\n"
"}\n";

buffer.clear();

Expand Down Expand Up @@ -277,16 +285,16 @@ void dump(
}

int64_t count_documents(
std::string const& host,
std::string const& index,
auth_options const& auth)
std::string const &host,
std::string const &index,
auth_options const &auth)
{
CURL * crl = curl_easy_init();
long response_code;
rapidjson::Document doc;
std::string url = host + "/" + index + "/_count";
std::string error;
std::vector<char> buffer;
CURL *crl = curl_easy_init();
long response_code;
rapidjson::Document doc;
std::string url = host + "/" + index + "/_count";
std::string error;
std::vector<char> buffer;

bool res = get_or_post_data(
crl,
Expand Down Expand Up @@ -314,19 +322,19 @@ int64_t count_documents(
}

int dump_mappings(
std::string const& host,
std::string const& index,
auth_options const& auth)
std::string const &host,
std::string const &index,
auth_options const &auth)
{
static char write_buffer[WRITE_BUF_SIZE];
static char write_buffer[WRITE_BUF_SIZE];
static rapidjson::FileWriteStream stream(stdout, write_buffer, sizeof(write_buffer));

CURL * crl = curl_easy_init();
long response_code;
rapidjson::Document doc;
std::string url = host + "/" + index + "/_mapping";
std::string error;
std::vector<char> buffer;
CURL *crl = curl_easy_init();
long response_code;
rapidjson::Document doc;
std::string url = host + "/" + index + "/_mapping";
std::string error;
std::vector<char> buffer;

bool res = get_or_post_data(
crl,
Expand Down Expand Up @@ -361,8 +369,8 @@ int dump_mappings(
}

int main(
int argc,
char * argv[])
int argc,
char *argv[])
{
curl_global_init(CURL_GLOBAL_ALL);

Expand Down Expand Up @@ -431,23 +439,23 @@ int main(
for (int i = 0; i < slices; i++)
{
dump_options opts;
opts.host = host;
opts.index = index;
opts.auth = auth;
opts.size = size;
opts.slice_id = i;
opts.host = host;
opts.index = index;
opts.auth = auth;
opts.size = size;
opts.slice_id = i;
opts.slice_max = slices;

auto cnt = std::unique_ptr<thread_container>(new thread_container());
cnt->slice_id = i;
cnt->thread = std::thread(dump, opts, &cnt->state);
auto cnt = std::unique_ptr<thread_container>(new thread_container());
cnt->slice_id = i;
cnt->thread = std::thread(dump, opts, &cnt->state);

threads.push_back(std::move(cnt));
}

int exit_code = 0;

for (auto& cnt : threads)
for (auto &cnt : threads)
{
cnt->thread.join();

Expand Down