Skip to content

Commit

Permalink
Support Ingest SST Files (vesoft-inc#654)
Browse files Browse the repository at this point in the history
It is used to ingest sst files directly after download files from hdfs.
  • Loading branch information
darionyaphet authored and dangleptr committed Aug 7, 2019
1 parent 3c2f560 commit c16b72c
Show file tree
Hide file tree
Showing 66 changed files with 1,528 additions and 266 deletions.
1 change: 1 addition & 0 deletions src/common/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ add_subdirectory(concurrent)
add_subdirectory(thread)
add_subdirectory(process)
add_subdirectory(hdfs)
add_subdirectory(http)
add_subdirectory(stats)
add_subdirectory(filter)
add_subdirectory(test)
6 changes: 6 additions & 0 deletions src/common/fs/FileUtils.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -347,6 +347,12 @@ bool FileUtils::makeDir(const std::string& dir) {
return true;
}

bool FileUtils::exist(const std::string& path) {
if (path.empty()) {
return false;
}
return access(path.c_str(), F_OK) == 0;
}

std::vector<std::string> FileUtils::listAllTypedEntitiesInDir(
const char* dirpath,
Expand Down
2 changes: 2 additions & 0 deletions src/common/fs/FileUtils.h
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,8 @@ class FileUtils final {
// It will make the parent directories as needed
// (much like commandline "mkdir -p")
static bool makeDir(const std::string& dir);
// Check the path is exist
static bool exist(const std::string& path);

/**
* List all entities in the given directory, whose type matches
Expand Down
5 changes: 5 additions & 0 deletions src/common/http/CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
add_library(http_client_obj OBJECT HttpClient.cpp)

add_dependencies(http_client_obj process_obj base_obj)

add_subdirectory(test)
25 changes: 25 additions & 0 deletions src/common/http/HttpClient.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
/* Copyright (c) 2019 vesoft inc. All rights reserved.
*
* This source code is licensed under Apache 2.0 License,
* attached with Common Clause Condition 1.0, found in the LICENSES directory.
*/

#include "http/HttpClient.h"
#include "process/ProcessUtils.h"

namespace nebula {
namespace http {

StatusOr<std::string> HttpClient::get(const std::string& path) {
auto command = folly::stringPrintf("/usr/bin/curl -G \"%s\"", path.c_str());
LOG(INFO) << "HTTP Get Command: " << command;
auto result = nebula::ProcessUtils::runCommand(command.c_str());
if (result.ok()) {
return result.value();
} else {
return Status::Error(folly::stringPrintf("Http Get Failed: %s", path.c_str()));
}
}

} // namespace http
} // namespace nebula
29 changes: 29 additions & 0 deletions src/common/http/HttpClient.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
/* Copyright (c) 2019 vesoft inc. All rights reserved.
*
* This source code is licensed under Apache 2.0 License,
* attached with Common Clause Condition 1.0, found in the LICENSES directory.
*/

#ifndef COMMON_HTTPCLIENT_H
#define COMMON_HTTPCLIENT_H

#include "base/Base.h"
#include "base/StatusOr.h"

namespace nebula {
namespace http {

class HttpClient {
public:
HttpClient() = delete;

~HttpClient() = default;

static StatusOr<std::string> get(const std::string& path);
};

} // namespace http
} // namespace nebula

#endif // COMMON_HTTPCLIENT_H

20 changes: 20 additions & 0 deletions src/common/http/test/CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
nebula_add_test(
NAME
http_client_test
SOURCES
HttpClientTest.cpp
OBJECTS
$<TARGET_OBJECTS:http_client_obj>
$<TARGET_OBJECTS:ws_obj>
$<TARGET_OBJECTS:process_obj>
$<TARGET_OBJECTS:fs_obj>
$<TARGET_OBJECTS:stats_obj>
$<TARGET_OBJECTS:base_obj>
LIBRARIES
proxygenhttpserver
proxygenlib
wangle
gtest
gtest_main
)

95 changes: 95 additions & 0 deletions src/common/http/test/HttpClientTest.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
/* Copyright (c) 2019 vesoft inc. All rights reserved.
*
* This source code is licensed under Apache 2.0 License,
* attached with Common Clause Condition 1.0, found in the LICENSES directory.
*/

#include "base/Base.h"
#include "http/HttpClient.h"
#include <gtest/gtest.h>
#include "webservice/Common.h"
#include "webservice/WebService.h"
#include "proxygen/httpserver/RequestHandler.h"
#include <proxygen/httpserver/ResponseBuilder.h>

namespace nebula {
namespace http {

class HttpClientHandler : public proxygen::RequestHandler {
public:
HttpClientHandler() = default;

void onRequest(std::unique_ptr<proxygen::HTTPMessage>) noexcept override {
}

void onBody(std::unique_ptr<folly::IOBuf>) noexcept override {
}

void onEOM() noexcept override {
proxygen::ResponseBuilder(downstream_)
.status(WebServiceUtils::to(HttpStatusCode::OK),
WebServiceUtils::toString(HttpStatusCode::OK))
.body("HttpClientHandler successfully")
.sendWithEOM();
}

void onUpgrade(proxygen::UpgradeProtocol) noexcept override {
}

void requestComplete() noexcept override {
delete this;
}

void onError(proxygen::ProxygenError error) noexcept override {
LOG(ERROR) << "HttpClientHandler Error: "
<< proxygen::getErrorString(error);
}
};
class HttpClientTestEnv : public ::testing::Environment {
public:
void SetUp() override {
FLAGS_ws_http_port = 0;
FLAGS_ws_h2_port = 0;
LOG(INFO) << "Starting web service...";

WebService::registerHandler("/path", [] {
return new HttpClientHandler();
});
auto status = WebService::start();
ASSERT_TRUE(status.ok()) << status;
}

void TearDown() override {
WebService::stop();
VLOG(1) << "Web service stopped";
}
};

TEST(HttpClient, get) {
{
auto url = folly::stringPrintf("http://%s:%d%s", FLAGS_ws_ip.c_str(),
FLAGS_ws_http_port, "/path");
auto result = HttpClient::get(url);
ASSERT_TRUE(result.ok());
ASSERT_EQ("HttpClientHandler successfully", result.value());
}
{
auto url = folly::stringPrintf("http://%s:%d%s", FLAGS_ws_ip.c_str(),
FLAGS_ws_http_port, "/not_exist");
auto result = HttpClient::get(url);
ASSERT_TRUE(result.ok());
ASSERT_TRUE(result.value().empty());
}
}

} // namespace http
} // namespace nebula

int main(int argc, char** argv) {
testing::InitGoogleTest(&argc, argv);
folly::init(&argc, &argv, true);
google::SetStderrLogging(google::INFO);

::testing::AddGlobalTestEnvironment(new nebula::http::HttpClientTestEnv());
return RUN_ALL_TESTS();
}
6 changes: 6 additions & 0 deletions src/daemons/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ nebula_add_executable(
$<TARGET_OBJECTS:meta_thrift_obj>
$<TARGET_OBJECTS:time_obj>
$<TARGET_OBJECTS:fs_obj>
$<TARGET_OBJECTS:http_client_obj>
$<TARGET_OBJECTS:stats_obj>
$<TARGET_OBJECTS:thread_obj>
$<TARGET_OBJECTS:parser_obj>
Expand All @@ -27,6 +28,7 @@ nebula_add_executable(
$<TARGET_OBJECTS:thrift_obj>
$<TARGET_OBJECTS:schema_obj>
$<TARGET_OBJECTS:ws_obj>
$<TARGET_OBJECTS:ws_common_obj>
$<TARGET_OBJECTS:dataman_obj>
$<TARGET_OBJECTS:meta_gflags_man_obj>
$<TARGET_OBJECTS:gflags_man_obj>
Expand Down Expand Up @@ -57,6 +59,7 @@ nebula_add_executable(
$<TARGET_OBJECTS:dataman_obj>
$<TARGET_OBJECTS:schema_obj>
$<TARGET_OBJECTS:hdfs_helper_obj>
$<TARGET_OBJECTS:http_client_obj>
$<TARGET_OBJECTS:filter_obj>
$<TARGET_OBJECTS:base_obj>
$<TARGET_OBJECTS:thrift_obj>
Expand All @@ -67,6 +70,7 @@ nebula_add_executable(
$<TARGET_OBJECTS:network_obj>
$<TARGET_OBJECTS:process_obj>
$<TARGET_OBJECTS:ws_obj>
$<TARGET_OBJECTS:ws_common_obj>
$<TARGET_OBJECTS:meta_gflags_man_obj>
$<TARGET_OBJECTS:gflags_man_obj>
LIBRARIES
Expand Down Expand Up @@ -96,6 +100,7 @@ nebula_add_executable(
$<TARGET_OBJECTS:raftex_thrift_obj>
$<TARGET_OBJECTS:wal_obj>
$<TARGET_OBJECTS:hdfs_helper_obj>
$<TARGET_OBJECTS:http_client_obj>
$<TARGET_OBJECTS:base_obj>
$<TARGET_OBJECTS:thrift_obj>
$<TARGET_OBJECTS:network_obj>
Expand All @@ -105,6 +110,7 @@ nebula_add_executable(
$<TARGET_OBJECTS:stats_obj>
$<TARGET_OBJECTS:process_obj>
$<TARGET_OBJECTS:ws_obj>
$<TARGET_OBJECTS:ws_common_obj>
$<TARGET_OBJECTS:kv_gflags_man_obj>
$<TARGET_OBJECTS:gflags_man_obj>
LIBRARIES
Expand Down
2 changes: 1 addition & 1 deletion src/daemons/GraphDaemon.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,7 @@ int main(int argc, char *argv[]) {

Status setupSignalHandler() {
return nebula::SignalHandler::install(
{SIGINT, SIGTERM},
{SIGINT, SIGTERM},
[](nebula::SignalHandler::GeneralSignalInfo *info) {
signalHandler(info->sig());
});
Expand Down
16 changes: 14 additions & 2 deletions src/daemons/MetaDaemon.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
#include "common/base/SignalHandler.h"
#include <thrift/lib/cpp2/server/ThriftServer.h>
#include "meta/MetaServiceHandler.h"
#include "meta/MetaHttpIngestHandler.h"
#include "meta/MetaHttpStatusHandler.h"
#include "meta/MetaHttpDownloadHandler.h"
#include "webservice/WebService.h"
Expand All @@ -34,6 +35,7 @@ DEFINE_string(peers, "", "It is a list of IPs split by comma,"
"If empty, it means replica is 1");
DEFINE_string(local_ip, "", "Local ip speicified for NetworkUtils::getLocalIP");
DEFINE_int32(num_io_threads, 16, "Number of IO threads");
DEFINE_int32(meta_http_thread_num, 3, "Number of meta daemon's http thread");
DEFINE_int32(num_worker_threads, 32, "Number of workers");
DECLARE_string(part_man_type);

Expand Down Expand Up @@ -123,7 +125,7 @@ int main(int argc, char *argv[]) {
LOG(ERROR) << "nebula store init failed";
return EXIT_FAILURE;
}

auto clusterMan
= std::make_unique<nebula::meta::ClusterManager>(FLAGS_peers, "");
if (!clusterMan->loadOrCreateCluId(kvstore.get())) {
Expand All @@ -133,13 +135,23 @@ int main(int argc, char *argv[]) {
std::unique_ptr<nebula::hdfs::HdfsHelper> helper =
std::make_unique<nebula::hdfs::HdfsCommandHelper>();

std::unique_ptr<nebula::thread::GenericThreadPool> pool =
std::make_unique<nebula::thread::GenericThreadPool>();
pool->start(FLAGS_meta_http_thread_num, "http thread pool");
LOG(INFO) << "Http Thread Pool started";

LOG(INFO) << "Starting Meta HTTP Service";
nebula::WebService::registerHandler("/status", [] {
return new nebula::meta::MetaHttpStatusHandler();
});
nebula::WebService::registerHandler("/download-dispatch", [&] {
auto handler = new nebula::meta::MetaHttpDownloadHandler();
handler->init(kvstore.get(), helper.get());
handler->init(kvstore.get(), helper.get(), pool.get());
return handler;
});
nebula::WebService::registerHandler("/ingest-dispatch", [&] {
auto handler = new nebula::meta::MetaHttpIngestHandler();
handler->init(kvstore.get(), pool.get());
return handler;
});
status = nebula::WebService::start();
Expand Down
21 changes: 16 additions & 5 deletions src/daemons/StorageDaemon.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
#include "network/NetworkUtils.h"
#include "thread/GenericThreadPool.h"
#include "storage/StorageServiceHandler.h"
#include "storage/StorageHttpIngestHandler.h"
#include "storage/StorageHttpStatusHandler.h"
#include "storage/StorageHttpDownloadHandler.h"
#include "storage/StorageHttpAdminHandler.h"
Expand Down Expand Up @@ -41,6 +42,7 @@ DEFINE_string(store_type, "nebula",
"Which type of KVStore to be used by the storage daemon."
" Options can be \"nebula\", \"hbase\", etc.");
DEFINE_int32(num_io_threads, 16, "Number of IO threads");
DEFINE_int32(storage_http_thread_num, 3, "Number of storage daemon's http thread");
DEFINE_int32(num_worker_threads, 32, "Number of workers");

using nebula::operator<<;
Expand Down Expand Up @@ -193,7 +195,7 @@ int main(int argc, char *argv[]) {

LOG(INFO) << "Init kvstore";
std::unique_ptr<KVStore> kvstore = getStoreInstance(localhost,
std::move(paths),
paths,
ioThreadPool,
threadManager,
metaClient.get(),
Expand All @@ -205,15 +207,24 @@ int main(int argc, char *argv[]) {

std::unique_ptr<nebula::hdfs::HdfsHelper> helper =
std::make_unique<nebula::hdfs::HdfsCommandHelper>();
auto* helperPtr = helper.get();

std::unique_ptr<nebula::thread::GenericThreadPool> pool =
std::make_unique<nebula::thread::GenericThreadPool>();
pool->start(FLAGS_storage_http_thread_num, "http thread pool");
LOG(INFO) << "Http Thread Pool started";

LOG(INFO) << "Starting Storage HTTP Service";
nebula::WebService::registerHandler("/status", [] {
return new nebula::storage::StorageHttpStatusHandler();
});
nebula::WebService::registerHandler("/download", [helperPtr] {
auto* handler = new nebula::storage::StorageHttpDownloadHandler();
handler->init(helperPtr);
nebula::WebService::registerHandler("/download", [&] {
auto handler = new nebula::storage::StorageHttpDownloadHandler();
handler->init(helper.get(), pool.get(), kvstore.get(), paths);
return handler;
});
nebula::WebService::registerHandler("/ingest", [&] {
auto handler = new nebula::storage::StorageHttpIngestHandler();
handler->init(kvstore.get());
return handler;
});
nebula::WebService::registerHandler("/admin", [&] {
Expand Down
1 change: 1 addition & 0 deletions src/graph/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ add_library(
YieldExecutor.cpp
DownloadExecutor.cpp
OrderByExecutor.cpp
IngestExecutor.cpp
ConfigExecutor.cpp
SchemaHelper.cpp
)
Expand Down
Loading

0 comments on commit c16b72c

Please sign in to comment.