Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support Ingest SST Files #654

Merged
merged 22 commits into from
Aug 7, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions src/common/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ add_subdirectory(concurrent)
add_subdirectory(thread)
add_subdirectory(process)
add_subdirectory(hdfs)
add_subdirectory(http)
add_subdirectory(stats)
add_subdirectory(filter)
add_subdirectory(test)
6 changes: 6 additions & 0 deletions src/common/fs/FileUtils.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -347,6 +347,12 @@ bool FileUtils::makeDir(const std::string& dir) {
return true;
}

bool FileUtils::exist(const std::string& path) {
if (path.empty()) {
return false;
}
return access(path.c_str(), F_OK) == 0;
}

std::vector<std::string> FileUtils::listAllTypedEntitiesInDir(
const char* dirpath,
Expand Down
2 changes: 2 additions & 0 deletions src/common/fs/FileUtils.h
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,8 @@ class FileUtils final {
// It will make the parent directories as needed
// (much like commandline "mkdir -p")
static bool makeDir(const std::string& dir);
// Check the path is exist
static bool exist(const std::string& path);

/**
* List all entities in the given directory, whose type matches
Expand Down
5 changes: 5 additions & 0 deletions src/common/http/CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
add_library(http_client_obj OBJECT HttpClient.cpp)

add_dependencies(http_client_obj process_obj base_obj)

add_subdirectory(test)
25 changes: 25 additions & 0 deletions src/common/http/HttpClient.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
/* Copyright (c) 2019 vesoft inc. All rights reserved.
*
* This source code is licensed under Apache 2.0 License,
* attached with Common Clause Condition 1.0, found in the LICENSES directory.
*/

#include "http/HttpClient.h"
#include "process/ProcessUtils.h"

namespace nebula {
namespace http {

StatusOr<std::string> HttpClient::get(const std::string& path) {
auto command = folly::stringPrintf("/usr/bin/curl -G \"%s\"", path.c_str());
LOG(INFO) << "HTTP Get Command: " << command;
auto result = nebula::ProcessUtils::runCommand(command.c_str());
if (result.ok()) {
return result.value();
} else {
return Status::Error(folly::stringPrintf("Http Get Failed: %s", path.c_str()));
}
}

} // namespace http
} // namespace nebula
29 changes: 29 additions & 0 deletions src/common/http/HttpClient.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
/* Copyright (c) 2019 vesoft inc. All rights reserved.
*
* This source code is licensed under Apache 2.0 License,
* attached with Common Clause Condition 1.0, found in the LICENSES directory.
*/

#ifndef COMMON_HTTPCLIENT_H
#define COMMON_HTTPCLIENT_H

#include "base/Base.h"
#include "base/StatusOr.h"

namespace nebula {
namespace http {

class HttpClient {
public:
HttpClient() = delete;

~HttpClient() = default;

static StatusOr<std::string> get(const std::string& path);
};

} // namespace http
} // namespace nebula

#endif // COMMON_HTTPCLIENT_H

20 changes: 20 additions & 0 deletions src/common/http/test/CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
nebula_add_test(
NAME
http_client_test
SOURCES
HttpClientTest.cpp
OBJECTS
$<TARGET_OBJECTS:http_client_obj>
$<TARGET_OBJECTS:ws_obj>
$<TARGET_OBJECTS:process_obj>
$<TARGET_OBJECTS:fs_obj>
$<TARGET_OBJECTS:stats_obj>
$<TARGET_OBJECTS:base_obj>
LIBRARIES
proxygenhttpserver
proxygenlib
wangle
gtest
gtest_main
)

95 changes: 95 additions & 0 deletions src/common/http/test/HttpClientTest.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
/* Copyright (c) 2019 vesoft inc. All rights reserved.
*
* This source code is licensed under Apache 2.0 License,
* attached with Common Clause Condition 1.0, found in the LICENSES directory.
*/

#include "base/Base.h"
#include "http/HttpClient.h"
#include <gtest/gtest.h>
#include "webservice/Common.h"
#include "webservice/WebService.h"
#include "proxygen/httpserver/RequestHandler.h"
#include <proxygen/httpserver/ResponseBuilder.h>

namespace nebula {
namespace http {

class HttpClientHandler : public proxygen::RequestHandler {
public:
HttpClientHandler() = default;

void onRequest(std::unique_ptr<proxygen::HTTPMessage>) noexcept override {
}

void onBody(std::unique_ptr<folly::IOBuf>) noexcept override {
}

void onEOM() noexcept override {
proxygen::ResponseBuilder(downstream_)
.status(WebServiceUtils::to(HttpStatusCode::OK),
WebServiceUtils::toString(HttpStatusCode::OK))
.body("HttpClientHandler successfully")
.sendWithEOM();
}

void onUpgrade(proxygen::UpgradeProtocol) noexcept override {
}

void requestComplete() noexcept override {
delete this;
}

void onError(proxygen::ProxygenError error) noexcept override {
LOG(ERROR) << "HttpClientHandler Error: "
<< proxygen::getErrorString(error);
}
};
class HttpClientTestEnv : public ::testing::Environment {
public:
void SetUp() override {
FLAGS_ws_http_port = 0;
FLAGS_ws_h2_port = 0;
LOG(INFO) << "Starting web service...";

WebService::registerHandler("/path", [] {
return new HttpClientHandler();
});
auto status = WebService::start();
ASSERT_TRUE(status.ok()) << status;
}

void TearDown() override {
WebService::stop();
VLOG(1) << "Web service stopped";
}
};

TEST(HttpClient, get) {
{
auto url = folly::stringPrintf("http://%s:%d%s", FLAGS_ws_ip.c_str(),
FLAGS_ws_http_port, "/path");
auto result = HttpClient::get(url);
ASSERT_TRUE(result.ok());
ASSERT_EQ("HttpClientHandler successfully", result.value());
}
{
auto url = folly::stringPrintf("http://%s:%d%s", FLAGS_ws_ip.c_str(),
FLAGS_ws_http_port, "/not_exist");
auto result = HttpClient::get(url);
ASSERT_TRUE(result.ok());
ASSERT_TRUE(result.value().empty());
}
}

} // namespace http
} // namespace nebula

int main(int argc, char** argv) {
testing::InitGoogleTest(&argc, argv);
folly::init(&argc, &argv, true);
google::SetStderrLogging(google::INFO);

::testing::AddGlobalTestEnvironment(new nebula::http::HttpClientTestEnv());
return RUN_ALL_TESTS();
}
6 changes: 6 additions & 0 deletions src/daemons/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ nebula_add_executable(
$<TARGET_OBJECTS:meta_thrift_obj>
$<TARGET_OBJECTS:time_obj>
$<TARGET_OBJECTS:fs_obj>
$<TARGET_OBJECTS:http_client_obj>
$<TARGET_OBJECTS:stats_obj>
$<TARGET_OBJECTS:thread_obj>
$<TARGET_OBJECTS:parser_obj>
Expand All @@ -27,6 +28,7 @@ nebula_add_executable(
$<TARGET_OBJECTS:thrift_obj>
$<TARGET_OBJECTS:schema_obj>
$<TARGET_OBJECTS:ws_obj>
$<TARGET_OBJECTS:ws_common_obj>
$<TARGET_OBJECTS:dataman_obj>
$<TARGET_OBJECTS:meta_gflags_man_obj>
$<TARGET_OBJECTS:gflags_man_obj>
Expand Down Expand Up @@ -57,6 +59,7 @@ nebula_add_executable(
$<TARGET_OBJECTS:dataman_obj>
$<TARGET_OBJECTS:schema_obj>
$<TARGET_OBJECTS:hdfs_helper_obj>
$<TARGET_OBJECTS:http_client_obj>
$<TARGET_OBJECTS:filter_obj>
$<TARGET_OBJECTS:base_obj>
$<TARGET_OBJECTS:thrift_obj>
Expand All @@ -67,6 +70,7 @@ nebula_add_executable(
$<TARGET_OBJECTS:network_obj>
$<TARGET_OBJECTS:process_obj>
$<TARGET_OBJECTS:ws_obj>
$<TARGET_OBJECTS:ws_common_obj>
$<TARGET_OBJECTS:meta_gflags_man_obj>
$<TARGET_OBJECTS:gflags_man_obj>
LIBRARIES
Expand Down Expand Up @@ -96,6 +100,7 @@ nebula_add_executable(
$<TARGET_OBJECTS:raftex_thrift_obj>
$<TARGET_OBJECTS:wal_obj>
$<TARGET_OBJECTS:hdfs_helper_obj>
$<TARGET_OBJECTS:http_client_obj>
$<TARGET_OBJECTS:base_obj>
$<TARGET_OBJECTS:thrift_obj>
$<TARGET_OBJECTS:network_obj>
Expand All @@ -105,6 +110,7 @@ nebula_add_executable(
$<TARGET_OBJECTS:stats_obj>
$<TARGET_OBJECTS:process_obj>
$<TARGET_OBJECTS:ws_obj>
$<TARGET_OBJECTS:ws_common_obj>
$<TARGET_OBJECTS:kv_gflags_man_obj>
$<TARGET_OBJECTS:gflags_man_obj>
LIBRARIES
Expand Down
2 changes: 1 addition & 1 deletion src/daemons/GraphDaemon.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,7 @@ int main(int argc, char *argv[]) {

Status setupSignalHandler() {
return nebula::SignalHandler::install(
{SIGINT, SIGTERM},
{SIGINT, SIGTERM},
[](nebula::SignalHandler::GeneralSignalInfo *info) {
signalHandler(info->sig());
});
Expand Down
16 changes: 14 additions & 2 deletions src/daemons/MetaDaemon.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
#include "common/base/SignalHandler.h"
#include <thrift/lib/cpp2/server/ThriftServer.h>
#include "meta/MetaServiceHandler.h"
#include "meta/MetaHttpIngestHandler.h"
#include "meta/MetaHttpStatusHandler.h"
#include "meta/MetaHttpDownloadHandler.h"
#include "webservice/WebService.h"
Expand All @@ -34,6 +35,7 @@ DEFINE_string(peers, "", "It is a list of IPs split by comma,"
"If empty, it means replica is 1");
DEFINE_string(local_ip, "", "Local ip speicified for NetworkUtils::getLocalIP");
DEFINE_int32(num_io_threads, 16, "Number of IO threads");
DEFINE_int32(meta_http_thread_num, 3, "Number of meta daemon's http thread");
DEFINE_int32(num_worker_threads, 32, "Number of workers");
DECLARE_string(part_man_type);

Expand Down Expand Up @@ -123,7 +125,7 @@ int main(int argc, char *argv[]) {
LOG(ERROR) << "nebula store init failed";
return EXIT_FAILURE;
}

auto clusterMan
= std::make_unique<nebula::meta::ClusterManager>(FLAGS_peers, "");
if (!clusterMan->loadOrCreateCluId(kvstore.get())) {
Expand All @@ -133,13 +135,23 @@ int main(int argc, char *argv[]) {
std::unique_ptr<nebula::hdfs::HdfsHelper> helper =
std::make_unique<nebula::hdfs::HdfsCommandHelper>();

std::unique_ptr<nebula::thread::GenericThreadPool> pool =
std::make_unique<nebula::thread::GenericThreadPool>();
pool->start(FLAGS_meta_http_thread_num, "http thread pool");
LOG(INFO) << "Http Thread Pool started";

LOG(INFO) << "Starting Meta HTTP Service";
nebula::WebService::registerHandler("/status", [] {
return new nebula::meta::MetaHttpStatusHandler();
});
nebula::WebService::registerHandler("/download-dispatch", [&] {
auto handler = new nebula::meta::MetaHttpDownloadHandler();
handler->init(kvstore.get(), helper.get());
handler->init(kvstore.get(), helper.get(), pool.get());
return handler;
});
nebula::WebService::registerHandler("/ingest-dispatch", [&] {
auto handler = new nebula::meta::MetaHttpIngestHandler();
handler->init(kvstore.get(), pool.get());
return handler;
});
status = nebula::WebService::start();
Expand Down
21 changes: 16 additions & 5 deletions src/daemons/StorageDaemon.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
#include "network/NetworkUtils.h"
#include "thread/GenericThreadPool.h"
#include "storage/StorageServiceHandler.h"
#include "storage/StorageHttpIngestHandler.h"
#include "storage/StorageHttpStatusHandler.h"
#include "storage/StorageHttpDownloadHandler.h"
#include "storage/StorageHttpAdminHandler.h"
Expand Down Expand Up @@ -41,6 +42,7 @@ DEFINE_string(store_type, "nebula",
"Which type of KVStore to be used by the storage daemon."
" Options can be \"nebula\", \"hbase\", etc.");
DEFINE_int32(num_io_threads, 16, "Number of IO threads");
DEFINE_int32(storage_http_thread_num, 3, "Number of storage daemon's http thread");
DEFINE_int32(num_worker_threads, 32, "Number of workers");

using nebula::operator<<;
Expand Down Expand Up @@ -193,7 +195,7 @@ int main(int argc, char *argv[]) {

LOG(INFO) << "Init kvstore";
std::unique_ptr<KVStore> kvstore = getStoreInstance(localhost,
std::move(paths),
paths,
ioThreadPool,
threadManager,
metaClient.get(),
Expand All @@ -205,15 +207,24 @@ int main(int argc, char *argv[]) {

std::unique_ptr<nebula::hdfs::HdfsHelper> helper =
std::make_unique<nebula::hdfs::HdfsCommandHelper>();
auto* helperPtr = helper.get();

std::unique_ptr<nebula::thread::GenericThreadPool> pool =
std::make_unique<nebula::thread::GenericThreadPool>();
pool->start(FLAGS_storage_http_thread_num, "http thread pool");
LOG(INFO) << "Http Thread Pool started";

LOG(INFO) << "Starting Storage HTTP Service";
nebula::WebService::registerHandler("/status", [] {
return new nebula::storage::StorageHttpStatusHandler();
});
nebula::WebService::registerHandler("/download", [helperPtr] {
auto* handler = new nebula::storage::StorageHttpDownloadHandler();
handler->init(helperPtr);
nebula::WebService::registerHandler("/download", [&] {
auto handler = new nebula::storage::StorageHttpDownloadHandler();
handler->init(helper.get(), pool.get(), kvstore.get(), paths);
return handler;
});
nebula::WebService::registerHandler("/ingest", [&] {
auto handler = new nebula::storage::StorageHttpIngestHandler();
handler->init(kvstore.get());
return handler;
});
nebula::WebService::registerHandler("/admin", [&] {
Expand Down
1 change: 1 addition & 0 deletions src/graph/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ add_library(
YieldExecutor.cpp
DownloadExecutor.cpp
OrderByExecutor.cpp
IngestExecutor.cpp
ConfigExecutor.cpp
SchemaHelper.cpp
)
Expand Down
Loading