Skip to content

Commit

Permalink
support http client
Browse files Browse the repository at this point in the history
  • Loading branch information
darionyaphet committed Jul 25, 2019
1 parent d35ff9c commit 623e80b
Show file tree
Hide file tree
Showing 33 changed files with 343 additions and 145 deletions.
1 change: 1 addition & 0 deletions src/common/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ add_subdirectory(concurrent)
add_subdirectory(thread)
add_subdirectory(process)
add_subdirectory(hdfs)
add_subdirectory(http)
add_subdirectory(stats)
add_subdirectory(filter)
add_subdirectory(test)
5 changes: 5 additions & 0 deletions src/common/http/CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
add_library(http_client_obj OBJECT HttpClient.cpp)

add_dependencies(http_client_obj process_obj base_obj)

add_subdirectory(test)
25 changes: 25 additions & 0 deletions src/common/http/HttpClient.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
/* Copyright (c) 2019 vesoft inc. All rights reserved.
*
* This source code is licensed under Apache 2.0 License,
* attached with Common Clause Condition 1.0, found in the LICENSES directory.
*/

#include "http/HttpClient.h"
#include "process/ProcessUtils.h"

namespace nebula {
namespace http {

StatusOr<std::string> HttpClient::get(const std::string& path) {
auto command = folly::stringPrintf("/usr/bin/curl -G %s", path.c_str());
LOG(INFO) << "HTTP Get Command: " << command;
auto result = nebula::ProcessUtils::runCommand(command.c_str());
if (result.ok()) {
return result.value();
} else {
return Status::Error(folly::stringPrintf("Http Get Failed: %s", path.c_str()));
}
}

} // namespace http
} // namespace nebula
29 changes: 29 additions & 0 deletions src/common/http/HttpClient.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
/* Copyright (c) 2019 vesoft inc. All rights reserved.
*
* This source code is licensed under Apache 2.0 License,
* attached with Common Clause Condition 1.0, found in the LICENSES directory.
*/

#ifndef COMMON_HTTPCLIENT_H
#define COMMON_HTTPCLIENT_H

#include "base/Base.h"
#include "base/StatusOr.h"

namespace nebula {
namespace http {

class HttpClient {
public:
HttpClient() = delete;

~HttpClient() = default;

static StatusOr<std::string> get(const std::string& path);
};

} // namespace http
} // namespace nebula

#endif // COMMON_HTTPCLIENT_H

20 changes: 20 additions & 0 deletions src/common/http/test/CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
add_executable(
http_client_test
HttpClientTest.cpp
$<TARGET_OBJECTS:http_client_obj>
$<TARGET_OBJECTS:ws_obj>
$<TARGET_OBJECTS:process_obj>
$<TARGET_OBJECTS:fs_obj>
$<TARGET_OBJECTS:stats_obj>
$<TARGET_OBJECTS:base_obj>
)
nebula_link_libraries(
http_client_test
proxygenhttpserver
proxygenlib
wangle
gtest
gtest_main
)
nebula_add_test(http_client_test)

96 changes: 96 additions & 0 deletions src/common/http/test/HttpClientTest.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
/* Copyright (c) 2019 vesoft inc. All rights reserved.
*
* This source code is licensed under Apache 2.0 License,
* attached with Common Clause Condition 1.0, found in the LICENSES directory.
*/

#include "base/Base.h"
#include "http/HttpClient.h"
#include <gtest/gtest.h>
#include "webservice/Common.h"
#include "webservice/WebService.h"
#include "webservice/test/TestUtils.h"
#include "proxygen/httpserver/RequestHandler.h"
#include <proxygen/httpserver/ResponseBuilder.h>

namespace nebula {
namespace http {

class HttpClientHandler : public proxygen::RequestHandler {
public:
HttpClientHandler() = default;

void onRequest(std::unique_ptr<proxygen::HTTPMessage>) noexcept override {
}

void onBody(std::unique_ptr<folly::IOBuf>) noexcept override {
}

void onEOM() noexcept override {
proxygen::ResponseBuilder(downstream_)
.status(WebServiceUtils::to(HttpStatusCode::OK),
WebServiceUtils::toString(HttpStatusCode::OK))
.body("HttpClientHandler successfully")
.sendWithEOM();
}

void onUpgrade(proxygen::UpgradeProtocol) noexcept override {
}

void requestComplete() noexcept override {
delete this;
}

void onError(proxygen::ProxygenError error) noexcept override {
LOG(ERROR) << "HttpClientHandler Error: "
<< proxygen::getErrorString(error);
}
};
class HttpClientTestEnv : public ::testing::Environment {
public:
void SetUp() override {
FLAGS_ws_http_port = 0;
FLAGS_ws_h2_port = 0;
LOG(INFO) << "Starting web service...";

WebService::registerHandler("/path", [] {
return new HttpClientHandler();
});
auto status = WebService::start();
ASSERT_TRUE(status.ok()) << status;
}

void TearDown() override {
WebService::stop();
VLOG(1) << "Web service stopped";
}
};

TEST(HttpClient, get) {
{
auto url = folly::stringPrintf("http://%s:%d%s", FLAGS_ws_ip.c_str(),
FLAGS_ws_http_port, "/path");
auto result = HttpClient::get(url);
ASSERT_TRUE(result.ok());
ASSERT_EQ("HttpClientHandler successfully", result.value());
}
{
auto url = folly::stringPrintf("http://%s:%d%s", FLAGS_ws_ip.c_str(),
FLAGS_ws_http_port, "/not_exist");
auto result = HttpClient::get(url);
ASSERT_TRUE(result.ok());
ASSERT_EQ("", result.value());
}
}

} // namespace http
} // namespace nebula

int main(int argc, char** argv) {
testing::InitGoogleTest(&argc, argv);
folly::init(&argc, &argv, true);
google::SetStderrLogging(google::INFO);

::testing::AddGlobalTestEnvironment(new nebula::http::HttpClientTestEnv());
return RUN_ALL_TESTS();
}
3 changes: 3 additions & 0 deletions src/daemons/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ add_executable(
$<TARGET_OBJECTS:storage_client>
$<TARGET_OBJECTS:meta_thrift_obj>
$<TARGET_OBJECTS:time_obj>
$<TARGET_OBJECTS:http_client_obj>
$<TARGET_OBJECTS:fs_obj>
$<TARGET_OBJECTS:stats_obj>
$<TARGET_OBJECTS:thread_obj>
Expand Down Expand Up @@ -51,6 +52,7 @@ add_executable(
$<TARGET_OBJECTS:dataman_obj>
$<TARGET_OBJECTS:schema_obj>
$<TARGET_OBJECTS:hdfs_helper_obj>
$<TARGET_OBJECTS:http_client_obj>
$<TARGET_OBJECTS:filter_obj>
$<TARGET_OBJECTS:base_obj>
$<TARGET_OBJECTS:thrift_obj>
Expand Down Expand Up @@ -87,6 +89,7 @@ add_executable(
$<TARGET_OBJECTS:raftex_thrift_obj>
$<TARGET_OBJECTS:wal_obj>
$<TARGET_OBJECTS:hdfs_helper_obj>
$<TARGET_OBJECTS:http_client_obj>
$<TARGET_OBJECTS:base_obj>
$<TARGET_OBJECTS:thrift_obj>
$<TARGET_OBJECTS:network_obj>
Expand Down
12 changes: 10 additions & 2 deletions src/daemons/MetaDaemon.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ DEFINE_string(peers, "", "It is a list of IPs split by comma,"
"If empty, it means replica is 1");
DEFINE_string(local_ip, "", "Local ip speicified for NetworkUtils::getLocalIP");
DEFINE_int32(num_io_threads, 16, "Number of IO threads");
DEFINE_int32(meta_download_thread_num, 3, "Number of meta daemon's download thread");
DECLARE_string(part_man_type);

DEFINE_string(pid_file, "pids/nebula-metad.pid", "File to hold the process id");
Expand Down Expand Up @@ -117,13 +118,20 @@ int main(int argc, char *argv[]) {
std::make_unique<nebula::hdfs::HdfsCommandHelper>();
auto *helperPtr = helper.get();

std::unique_ptr<nebula::thread::GenericThreadPool> pool =
std::make_unique<nebula::thread::GenericThreadPool>();
pool->start(FLAGS_meta_download_thread_num);
LOG(INFO) << "Download Thread Pool started";
auto* poolPtr = pool.get();


LOG(INFO) << "Starting Meta HTTP Service";
nebula::WebService::registerHandler("/status", [] {
return new nebula::meta::MetaHttpStatusHandler();
});
nebula::WebService::registerHandler("/download-dispatch", [kvstore_, helperPtr] {
nebula::WebService::registerHandler("/download-dispatch", [kvstore_, helperPtr, poolPtr] {
auto handler = new nebula::meta::MetaHttpDownloadHandler();
handler->init(kvstore_, helperPtr);
handler->init(kvstore_, helperPtr, poolPtr);
return handler;
});
nebula::WebService::registerHandler("/ingest-dispatch", [kvstore_] {
Expand Down
11 changes: 9 additions & 2 deletions src/daemons/StorageDaemon.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ DEFINE_string(store_type, "nebula",
"Which type of KVStore to be used by the storage daemon."
" Options can be \"nebula\", \"hbase\", etc.");
DEFINE_int32(num_io_threads, 16, "Number of IO threads");
DEFINE_int32(storage_download_thread_num, 3, "Number of storage daemon's download thread");

using nebula::operator<<;
using nebula::Status;
Expand Down Expand Up @@ -167,13 +168,19 @@ int main(int argc, char *argv[]) {
std::make_unique<nebula::hdfs::HdfsCommandHelper>();
auto *helperPtr = helper.get();

std::unique_ptr<nebula::thread::GenericThreadPool> pool =
std::make_unique<nebula::thread::GenericThreadPool>();
pool->start(FLAGS_storage_download_thread_num);
LOG(INFO) << "Download Thread Pool started";
auto* poolPtr = pool.get();

LOG(INFO) << "Starting Storage HTTP Service";
nebula::WebService::registerHandler("/status", [] {
return new nebula::storage::StorageHttpStatusHandler();
});
nebula::WebService::registerHandler("/download", [helperPtr] {
nebula::WebService::registerHandler("/download", [helperPtr, poolPtr] {
auto handler = new nebula::storage::StorageHttpDownloadHandler();
handler->init(helperPtr);
handler->init(helperPtr, poolPtr, FLAGS_data_path);
return handler;
});
nebula::WebService::registerHandler("/ingest", [kvstore_] {
Expand Down
26 changes: 11 additions & 15 deletions src/graph/DownloadExecutor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -4,18 +4,17 @@
* attached with Common Clause Condition 1.0, found in the LICENSES directory.
*/

#include "base/StatusOr.h"
#include "http/HttpClient.h"
#include "graph/DownloadExecutor.h"
#include "process/ProcessUtils.h"
#include "base/StatusOr.h"
#include <folly/executors/IOThreadPoolExecutor.h>
#include <folly/executors/CPUThreadPoolExecutor.h>
#include "webservice/Common.h"
#include "webservice/WebService.h"

#include <folly/executors/Async.h>
#include <folly/futures/Future.h>
#include <folly/executors/ThreadedExecutor.h>

DEFINE_int32(meta_http_port, 11000, "Default meta daemon's http port");

namespace nebula {
namespace graph {

Expand All @@ -36,22 +35,19 @@ void DownloadExecutor::execute() {
auto *hdfsHost = sentence_->host();
auto hdfsPort = sentence_->port();
auto *hdfsPath = sentence_->path();
auto *hdfsLocal = sentence_->localPath();
if (hdfsHost == nullptr || hdfsPort == 0 || hdfsPath == nullptr || hdfsLocal == nullptr) {
if (hdfsHost == nullptr || hdfsPort == 0 || hdfsPath == nullptr) {
LOG(ERROR) << "URL Parse Failed";
resp_ = std::make_unique<cpp2::ExecutionResponse>();
onError_(Status::Error("URL Parse Failed"));
return;
}

auto func = [metaHost, hdfsHost, hdfsPort, hdfsPath, hdfsLocal, spaceId]() {
std::string tmp = "%s \"http://%s:%d/%s?host=%s&port=%d&path=%s&local=%s&space=%d\"";
auto command = folly::stringPrintf(tmp.c_str(), "/usr/bin/curl -G", metaHost.c_str(),
FLAGS_meta_http_port, "download-dispatch",
hdfsHost->c_str(), hdfsPort, hdfsPath->c_str(),
hdfsLocal->c_str(), spaceId);
LOG(INFO) << "Download Command: " << command;
auto result = nebula::ProcessUtils::runCommand(command.c_str());
auto func = [metaHost, hdfsHost, hdfsPort, hdfsPath, spaceId]() {
std::string tmp = "http://%s:%d/%s?host=%s&port=%d&path=%s&space=%d";
auto url = folly::stringPrintf(tmp.c_str(), metaHost.c_str(), FLAGS_ws_meta_http_port,
"download-dispatch", hdfsHost->c_str(),
hdfsPort, hdfsPath->c_str(), spaceId);
auto result = http::HttpClient::get(url);
if (result.ok() && result.value() == "SSTFile dispatch successfully") {
LOG(INFO) << "Download Successfully";
return true;
Expand Down
19 changes: 9 additions & 10 deletions src/graph/IngestExecutor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,15 @@
* attached with Common Clause Condition 1.0, found in the LICENSES directory.
*/

#include "http/HttpClient.h"
#include "graph/IngestExecutor.h"
#include "process/ProcessUtils.h"
#include "webservice/Common.h"
#include "webservice/WebService.h"

#include <folly/executors/Async.h>
#include <folly/futures/Future.h>

DEFINE_int32(meta_ingest_http_port, 11000, "Default meta daemon's http port");

namespace nebula {
namespace graph {

Expand All @@ -29,15 +30,13 @@ void IngestExecutor::execute() {
auto addresses = mc->getAddresses();
auto metaHost = network::NetworkUtils::intToIPv4(addresses[0].first);
auto spaceId = ectx()->rctx()->session()->space();
auto *path = sentence_->path();

auto func = [metaHost, path, spaceId]() {
std::string tmp = "/usr/bin/curl -G \"http://%s:%d/%s?path=%s&space=%d\"";
auto command = folly::stringPrintf(tmp.c_str(), metaHost.c_str(),
FLAGS_meta_ingest_http_port,
"ingest-dispatch", path->c_str(), spaceId);
LOG(INFO) << "Ingest Command: " << command;
auto result = nebula::ProcessUtils::runCommand(command.c_str());
auto func = [metaHost, spaceId]() {
std::string tmp = "http://%s:%d/%s?path=%s&space=%d";
auto url = folly::stringPrintf(tmp.c_str(), metaHost.c_str(),
FLAGS_ws_meta_http_port,
"ingest-dispatch", spaceId);
auto result = http::HttpClient::get(url);
if (result.ok() && result.value() == "SSTFile ingest successfully") {
LOG(INFO) << "Ingest Successfully";
return true;
Expand Down
Loading

0 comments on commit 623e80b

Please sign in to comment.