diff --git a/src/common/CMakeLists.txt b/src/common/CMakeLists.txt index 54a7ad5f3fe..58844907c7a 100644 --- a/src/common/CMakeLists.txt +++ b/src/common/CMakeLists.txt @@ -7,6 +7,7 @@ add_subdirectory(concurrent) add_subdirectory(thread) add_subdirectory(process) add_subdirectory(hdfs) +add_subdirectory(http) add_subdirectory(stats) add_subdirectory(filter) add_subdirectory(test) diff --git a/src/common/fs/FileUtils.cpp b/src/common/fs/FileUtils.cpp index cd72b9b6fc9..7e0e50147ff 100644 --- a/src/common/fs/FileUtils.cpp +++ b/src/common/fs/FileUtils.cpp @@ -347,6 +347,12 @@ bool FileUtils::makeDir(const std::string& dir) { return true; } +bool FileUtils::exist(const std::string& path) { + if (path.empty()) { + return false; + } + return access(path.c_str(), F_OK) == 0; +} std::vector FileUtils::listAllTypedEntitiesInDir( const char* dirpath, diff --git a/src/common/fs/FileUtils.h b/src/common/fs/FileUtils.h index e90fb337a88..74248126e3d 100644 --- a/src/common/fs/FileUtils.h +++ b/src/common/fs/FileUtils.h @@ -106,6 +106,8 @@ class FileUtils final { // It will make the parent directories as needed // (much like commandline "mkdir -p") static bool makeDir(const std::string& dir); + // Check the path is exist + static bool exist(const std::string& path); /** * List all entities in the given directory, whose type matches diff --git a/src/common/http/CMakeLists.txt b/src/common/http/CMakeLists.txt new file mode 100644 index 00000000000..5850a80bb57 --- /dev/null +++ b/src/common/http/CMakeLists.txt @@ -0,0 +1,5 @@ +add_library(http_client_obj OBJECT HttpClient.cpp) + +add_dependencies(http_client_obj process_obj base_obj) + +add_subdirectory(test) diff --git a/src/common/http/HttpClient.cpp b/src/common/http/HttpClient.cpp new file mode 100644 index 00000000000..9dadf337721 --- /dev/null +++ b/src/common/http/HttpClient.cpp @@ -0,0 +1,25 @@ +/* Copyright (c) 2019 vesoft inc. All rights reserved. + * + * This source code is licensed under Apache 2.0 License, + * attached with Common Clause Condition 1.0, found in the LICENSES directory. + */ + +#include "http/HttpClient.h" +#include "process/ProcessUtils.h" + +namespace nebula { +namespace http { + +StatusOr HttpClient::get(const std::string& path) { + auto command = folly::stringPrintf("/usr/bin/curl -G \"%s\"", path.c_str()); + LOG(INFO) << "HTTP Get Command: " << command; + auto result = nebula::ProcessUtils::runCommand(command.c_str()); + if (result.ok()) { + return result.value(); + } else { + return Status::Error(folly::stringPrintf("Http Get Failed: %s", path.c_str())); + } +} + +} // namespace http +} // namespace nebula diff --git a/src/common/http/HttpClient.h b/src/common/http/HttpClient.h new file mode 100644 index 00000000000..26e739470b3 --- /dev/null +++ b/src/common/http/HttpClient.h @@ -0,0 +1,29 @@ +/* Copyright (c) 2019 vesoft inc. All rights reserved. + * + * This source code is licensed under Apache 2.0 License, + * attached with Common Clause Condition 1.0, found in the LICENSES directory. + */ + +#ifndef COMMON_HTTPCLIENT_H +#define COMMON_HTTPCLIENT_H + +#include "base/Base.h" +#include "base/StatusOr.h" + +namespace nebula { +namespace http { + +class HttpClient { +public: + HttpClient() = delete; + + ~HttpClient() = default; + + static StatusOr get(const std::string& path); +}; + +} // namespace http +} // namespace nebula + +#endif // COMMON_HTTPCLIENT_H + diff --git a/src/common/http/test/CMakeLists.txt b/src/common/http/test/CMakeLists.txt new file mode 100644 index 00000000000..24e1b8dba85 --- /dev/null +++ b/src/common/http/test/CMakeLists.txt @@ -0,0 +1,20 @@ +nebula_add_test( + NAME + http_client_test + SOURCES + HttpClientTest.cpp + OBJECTS + $ + $ + $ + $ + $ + $ + LIBRARIES + proxygenhttpserver + proxygenlib + wangle + gtest + gtest_main +) + diff --git a/src/common/http/test/HttpClientTest.cpp b/src/common/http/test/HttpClientTest.cpp new file mode 100644 index 00000000000..e32589adb28 --- /dev/null +++ b/src/common/http/test/HttpClientTest.cpp @@ -0,0 +1,95 @@ +/* Copyright (c) 2019 vesoft inc. All rights reserved. + * + * This source code is licensed under Apache 2.0 License, + * attached with Common Clause Condition 1.0, found in the LICENSES directory. + */ + +#include "base/Base.h" +#include "http/HttpClient.h" +#include +#include "webservice/Common.h" +#include "webservice/WebService.h" +#include "proxygen/httpserver/RequestHandler.h" +#include + +namespace nebula { +namespace http { + +class HttpClientHandler : public proxygen::RequestHandler { +public: + HttpClientHandler() = default; + + void onRequest(std::unique_ptr) noexcept override { + } + + void onBody(std::unique_ptr) noexcept override { + } + + void onEOM() noexcept override { + proxygen::ResponseBuilder(downstream_) + .status(WebServiceUtils::to(HttpStatusCode::OK), + WebServiceUtils::toString(HttpStatusCode::OK)) + .body("HttpClientHandler successfully") + .sendWithEOM(); + } + + void onUpgrade(proxygen::UpgradeProtocol) noexcept override { + } + + void requestComplete() noexcept override { + delete this; + } + + void onError(proxygen::ProxygenError error) noexcept override { + LOG(ERROR) << "HttpClientHandler Error: " + << proxygen::getErrorString(error); + } +}; +class HttpClientTestEnv : public ::testing::Environment { +public: + void SetUp() override { + FLAGS_ws_http_port = 0; + FLAGS_ws_h2_port = 0; + LOG(INFO) << "Starting web service..."; + + WebService::registerHandler("/path", [] { + return new HttpClientHandler(); + }); + auto status = WebService::start(); + ASSERT_TRUE(status.ok()) << status; + } + + void TearDown() override { + WebService::stop(); + VLOG(1) << "Web service stopped"; + } +}; + +TEST(HttpClient, get) { + { + auto url = folly::stringPrintf("http://%s:%d%s", FLAGS_ws_ip.c_str(), + FLAGS_ws_http_port, "/path"); + auto result = HttpClient::get(url); + ASSERT_TRUE(result.ok()); + ASSERT_EQ("HttpClientHandler successfully", result.value()); + } + { + auto url = folly::stringPrintf("http://%s:%d%s", FLAGS_ws_ip.c_str(), + FLAGS_ws_http_port, "/not_exist"); + auto result = HttpClient::get(url); + ASSERT_TRUE(result.ok()); + ASSERT_TRUE(result.value().empty()); + } +} + +} // namespace http +} // namespace nebula + +int main(int argc, char** argv) { + testing::InitGoogleTest(&argc, argv); + folly::init(&argc, &argv, true); + google::SetStderrLogging(google::INFO); + + ::testing::AddGlobalTestEnvironment(new nebula::http::HttpClientTestEnv()); + return RUN_ALL_TESTS(); +} diff --git a/src/daemons/CMakeLists.txt b/src/daemons/CMakeLists.txt index dca742dfbb2..b46a882e901 100644 --- a/src/daemons/CMakeLists.txt +++ b/src/daemons/CMakeLists.txt @@ -18,6 +18,7 @@ nebula_add_executable( $ $ $ + $ $ $ $ @@ -27,6 +28,7 @@ nebula_add_executable( $ $ $ + $ $ $ $ @@ -57,6 +59,7 @@ nebula_add_executable( $ $ $ + $ $ $ $ @@ -67,6 +70,7 @@ nebula_add_executable( $ $ $ + $ $ $ LIBRARIES @@ -96,6 +100,7 @@ nebula_add_executable( $ $ $ + $ $ $ $ @@ -105,6 +110,7 @@ nebula_add_executable( $ $ $ + $ $ $ LIBRARIES diff --git a/src/daemons/GraphDaemon.cpp b/src/daemons/GraphDaemon.cpp index 4bc33ecdd27..cb795929d7d 100644 --- a/src/daemons/GraphDaemon.cpp +++ b/src/daemons/GraphDaemon.cpp @@ -158,7 +158,7 @@ int main(int argc, char *argv[]) { Status setupSignalHandler() { return nebula::SignalHandler::install( - {SIGINT, SIGTERM}, + {SIGINT, SIGTERM}, [](nebula::SignalHandler::GeneralSignalInfo *info) { signalHandler(info->sig()); }); diff --git a/src/daemons/MetaDaemon.cpp b/src/daemons/MetaDaemon.cpp index 756751a58a6..ee60b26a95c 100644 --- a/src/daemons/MetaDaemon.cpp +++ b/src/daemons/MetaDaemon.cpp @@ -8,6 +8,7 @@ #include "common/base/SignalHandler.h" #include #include "meta/MetaServiceHandler.h" +#include "meta/MetaHttpIngestHandler.h" #include "meta/MetaHttpStatusHandler.h" #include "meta/MetaHttpDownloadHandler.h" #include "webservice/WebService.h" @@ -34,6 +35,7 @@ DEFINE_string(peers, "", "It is a list of IPs split by comma," "If empty, it means replica is 1"); DEFINE_string(local_ip, "", "Local ip speicified for NetworkUtils::getLocalIP"); DEFINE_int32(num_io_threads, 16, "Number of IO threads"); +DEFINE_int32(meta_http_thread_num, 3, "Number of meta daemon's http thread"); DEFINE_int32(num_worker_threads, 32, "Number of workers"); DECLARE_string(part_man_type); @@ -123,7 +125,7 @@ int main(int argc, char *argv[]) { LOG(ERROR) << "nebula store init failed"; return EXIT_FAILURE; } - + auto clusterMan = std::make_unique(FLAGS_peers, ""); if (!clusterMan->loadOrCreateCluId(kvstore.get())) { @@ -133,13 +135,23 @@ int main(int argc, char *argv[]) { std::unique_ptr helper = std::make_unique(); + std::unique_ptr pool = + std::make_unique(); + pool->start(FLAGS_meta_http_thread_num, "http thread pool"); + LOG(INFO) << "Http Thread Pool started"; + LOG(INFO) << "Starting Meta HTTP Service"; nebula::WebService::registerHandler("/status", [] { return new nebula::meta::MetaHttpStatusHandler(); }); nebula::WebService::registerHandler("/download-dispatch", [&] { auto handler = new nebula::meta::MetaHttpDownloadHandler(); - handler->init(kvstore.get(), helper.get()); + handler->init(kvstore.get(), helper.get(), pool.get()); + return handler; + }); + nebula::WebService::registerHandler("/ingest-dispatch", [&] { + auto handler = new nebula::meta::MetaHttpIngestHandler(); + handler->init(kvstore.get(), pool.get()); return handler; }); status = nebula::WebService::start(); diff --git a/src/daemons/StorageDaemon.cpp b/src/daemons/StorageDaemon.cpp index db350467eb0..957ac7cd72d 100644 --- a/src/daemons/StorageDaemon.cpp +++ b/src/daemons/StorageDaemon.cpp @@ -10,6 +10,7 @@ #include "network/NetworkUtils.h" #include "thread/GenericThreadPool.h" #include "storage/StorageServiceHandler.h" +#include "storage/StorageHttpIngestHandler.h" #include "storage/StorageHttpStatusHandler.h" #include "storage/StorageHttpDownloadHandler.h" #include "storage/StorageHttpAdminHandler.h" @@ -41,6 +42,7 @@ DEFINE_string(store_type, "nebula", "Which type of KVStore to be used by the storage daemon." " Options can be \"nebula\", \"hbase\", etc."); DEFINE_int32(num_io_threads, 16, "Number of IO threads"); +DEFINE_int32(storage_http_thread_num, 3, "Number of storage daemon's http thread"); DEFINE_int32(num_worker_threads, 32, "Number of workers"); using nebula::operator<<; @@ -193,7 +195,7 @@ int main(int argc, char *argv[]) { LOG(INFO) << "Init kvstore"; std::unique_ptr kvstore = getStoreInstance(localhost, - std::move(paths), + paths, ioThreadPool, threadManager, metaClient.get(), @@ -205,15 +207,24 @@ int main(int argc, char *argv[]) { std::unique_ptr helper = std::make_unique(); - auto* helperPtr = helper.get(); + + std::unique_ptr pool = + std::make_unique(); + pool->start(FLAGS_storage_http_thread_num, "http thread pool"); + LOG(INFO) << "Http Thread Pool started"; LOG(INFO) << "Starting Storage HTTP Service"; nebula::WebService::registerHandler("/status", [] { return new nebula::storage::StorageHttpStatusHandler(); }); - nebula::WebService::registerHandler("/download", [helperPtr] { - auto* handler = new nebula::storage::StorageHttpDownloadHandler(); - handler->init(helperPtr); + nebula::WebService::registerHandler("/download", [&] { + auto handler = new nebula::storage::StorageHttpDownloadHandler(); + handler->init(helper.get(), pool.get(), kvstore.get(), paths); + return handler; + }); + nebula::WebService::registerHandler("/ingest", [&] { + auto handler = new nebula::storage::StorageHttpIngestHandler(); + handler->init(kvstore.get()); return handler; }); nebula::WebService::registerHandler("/admin", [&] { diff --git a/src/graph/CMakeLists.txt b/src/graph/CMakeLists.txt index f043fe36d22..d8753dc0850 100644 --- a/src/graph/CMakeLists.txt +++ b/src/graph/CMakeLists.txt @@ -35,6 +35,7 @@ add_library( YieldExecutor.cpp DownloadExecutor.cpp OrderByExecutor.cpp + IngestExecutor.cpp ConfigExecutor.cpp SchemaHelper.cpp ) diff --git a/src/graph/DownloadExecutor.cpp b/src/graph/DownloadExecutor.cpp index c0b916aa0b2..ced5a94cc16 100644 --- a/src/graph/DownloadExecutor.cpp +++ b/src/graph/DownloadExecutor.cpp @@ -4,18 +4,16 @@ * attached with Common Clause Condition 1.0, found in the LICENSES directory. */ +#include "base/StatusOr.h" +#include "http/HttpClient.h" #include "graph/DownloadExecutor.h" #include "process/ProcessUtils.h" -#include "base/StatusOr.h" -#include -#include +#include "webservice/Common.h" #include #include #include -DEFINE_int32(meta_http_port, 11000, "Default meta daemon's http port"); - namespace nebula { namespace graph { @@ -36,22 +34,19 @@ void DownloadExecutor::execute() { auto *hdfsHost = sentence_->host(); auto hdfsPort = sentence_->port(); auto *hdfsPath = sentence_->path(); - auto *hdfsLocal = sentence_->localPath(); - if (hdfsHost == nullptr || hdfsPort == 0 || hdfsPath == nullptr || hdfsLocal == nullptr) { + if (hdfsHost == nullptr || hdfsPort == 0 || hdfsPath == nullptr) { LOG(ERROR) << "URL Parse Failed"; resp_ = std::make_unique(); onError_(Status::Error("URL Parse Failed")); return; } - auto func = [metaHost, hdfsHost, hdfsPort, hdfsPath, hdfsLocal, spaceId]() { - auto tmp = "%s \"http://%s:%d/%s?host=%s&port=%d&path=%s&local=%s&space=%d\""; - auto command = folly::stringPrintf(tmp, "/usr/bin/curl -G", metaHost.c_str(), - FLAGS_meta_http_port, "download-dispatch", - hdfsHost->c_str(), hdfsPort, hdfsPath->c_str(), - hdfsLocal->c_str(), spaceId); - LOG(INFO) << "Download Command: " << command; - auto result = nebula::ProcessUtils::runCommand(command.c_str()); + auto func = [metaHost, hdfsHost, hdfsPort, hdfsPath, spaceId]() { + static const char *tmp = "http://%s:%d/%s?host=%s&port=%d&path=%s&space=%d"; + auto url = folly::stringPrintf(tmp, metaHost.c_str(), FLAGS_ws_meta_http_port, + "download-dispatch", hdfsHost->c_str(), + hdfsPort, hdfsPath->c_str(), spaceId); + auto result = http::HttpClient::get(url); if (result.ok() && result.value() == "SSTFile dispatch successfully") { LOG(INFO) << "Download Successfully"; return true; diff --git a/src/graph/Executor.cpp b/src/graph/Executor.cpp index 2aaf2df3628..d07c59c1ad6 100644 --- a/src/graph/Executor.cpp +++ b/src/graph/Executor.cpp @@ -33,6 +33,7 @@ #include "graph/YieldExecutor.h" #include "graph/DownloadExecutor.h" #include "graph/OrderByExecutor.h" +#include "graph/IngestExecutor.h" #include "graph/ConfigExecutor.h" namespace nebula { @@ -111,6 +112,9 @@ std::unique_ptr Executor::makeExecutor(Sentence *sentence) { case Sentence::Kind::kOrderBy: executor = std::make_unique(sentence, ectx()); break; + case Sentence::Kind::kIngest: + executor = std::make_unique(sentence, ectx()); + break; case Sentence::Kind::kConfig: executor = std::make_unique(sentence, ectx()); break; diff --git a/src/graph/IngestExecutor.cpp b/src/graph/IngestExecutor.cpp new file mode 100644 index 00000000000..ecb071392d8 --- /dev/null +++ b/src/graph/IngestExecutor.cpp @@ -0,0 +1,77 @@ +/* Copyright (c) 2019 vesoft inc. All rights reserved. + * + * This source code is licensed under Apache 2.0 License, + * attached with Common Clause Condition 1.0, found in the LICENSES directory. + */ + +#include "http/HttpClient.h" +#include "graph/IngestExecutor.h" +#include "process/ProcessUtils.h" +#include "webservice/Common.h" + +#include +#include + +namespace nebula { +namespace graph { + +IngestExecutor::IngestExecutor(Sentence *sentence, + ExecutionContext *ectx) : Executor(ectx) { + sentence_ = static_cast(sentence); +} + +Status IngestExecutor::prepare() { + return checkIfGraphSpaceChosen(); +} + +void IngestExecutor::execute() { + auto *mc = ectx()->getMetaClient(); + auto addresses = mc->getAddresses(); + auto metaHost = network::NetworkUtils::intToIPv4(addresses[0].first); + auto spaceId = ectx()->rctx()->session()->space(); + + auto func = [metaHost, spaceId]() { + static const char *tmp = "http://%s:%d/%s?space=%d"; + auto url = folly::stringPrintf(tmp, metaHost.c_str(), + FLAGS_ws_meta_http_port, + "ingest-dispatch", spaceId); + auto result = http::HttpClient::get(url); + if (result.ok() && result.value() == "SSTFile ingest successfully") { + LOG(INFO) << "Ingest Successfully"; + return true; + } else { + LOG(ERROR) << "Ingest Failed"; + return false; + } + }; + auto future = folly::async(func); + + auto *runner = ectx()->rctx()->runner(); + + auto cb = [this] (auto &&resp) { + if (!resp) { + DCHECK(onError_); + onError_(Status::Error("Ingest Failed")); + return; + } + resp_ = std::make_unique(); + DCHECK(onFinish_); + onFinish_(); + }; + + auto error = [this] (auto &&e) { + LOG(ERROR) << "Exception caught: " << e.what(); + DCHECK(onError_); + onError_(Status::Error("Internal error")); + return; + }; + + std::move(future).via(runner).thenValue(cb).thenError(error); +} + +void IngestExecutor::setupResponse(cpp2::ExecutionResponse &resp) { + resp = std::move(*resp_); +} + +} // namespace graph +} // namespace nebula diff --git a/src/graph/IngestExecutor.h b/src/graph/IngestExecutor.h new file mode 100644 index 00000000000..1ceb4a54dd2 --- /dev/null +++ b/src/graph/IngestExecutor.h @@ -0,0 +1,39 @@ +/* Copyright (c) 2019 vesoft inc. All rights reserved. + * + * This source code is licensed under Apache 2.0 License, + * attached with Common Clause Condition 1.0, found in the LICENSES directory. + */ + +#ifndef GRAPH_INGESTEXECUTOR_H +#define GRAPH_INGESTEXECUTOR_H + +#include "base/Base.h" +#include "graph/Executor.h" + +namespace nebula { +namespace graph { + +class IngestExecutor final : public Executor { +public: + IngestExecutor(Sentence *sentence, ExecutionContext *ectx); + + const char* name() const override { + return "IngestExecutor"; + } + + Status MUST_USE_RESULT prepare() override; + + void execute() override; + + void setupResponse(cpp2::ExecutionResponse &resp) override; + +private: + IngestSentence *sentence_{nullptr}; + std::unique_ptr resp_; +}; + +} // namespace graph +} // namespace nebula + +#endif // GRAPH_INGESTEXECUTOR_H + diff --git a/src/graph/test/CMakeLists.txt b/src/graph/test/CMakeLists.txt index bd51563b254..90052565e4c 100644 --- a/src/graph/test/CMakeLists.txt +++ b/src/graph/test/CMakeLists.txt @@ -40,6 +40,8 @@ nebula_add_test( SOURCES SessionManagerTest.cpp OBJECTS + $ + $ ${GRAPH_TEST_LIBS} LIBRARIES ${THRIFT_LIBRARIES} @@ -58,6 +60,8 @@ nebula_add_test( ConfigTest.cpp OBJECTS $ + $ + $ $ $ ${GRAPH_TEST_LIBS} @@ -75,7 +79,9 @@ nebula_add_test( GoTest.cpp OBJECTS $ + $ $ + $ $ ${GRAPH_TEST_LIBS} LIBRARIES @@ -93,8 +99,10 @@ nebula_add_test( OBJECTS $ $ - $ $ + $ + $ + $ $ $ $ @@ -115,6 +123,9 @@ nebula_add_test( DataTest.cpp OBJECTS $ + $ + $ + $ $ $ ${GRAPH_TEST_LIBS} @@ -132,6 +143,9 @@ nebula_add_test( OrderByTest.cpp OBJECTS $ + $ + $ + $ $ $ ${GRAPH_TEST_LIBS} diff --git a/src/graph/test/GraphHttpHandlerTest.cpp b/src/graph/test/GraphHttpHandlerTest.cpp index d384daeb501..9346560ccfb 100644 --- a/src/graph/test/GraphHttpHandlerTest.cpp +++ b/src/graph/test/GraphHttpHandlerTest.cpp @@ -6,12 +6,12 @@ #include "base/Base.h" #include +#include "http/HttpClient.h" #include "graph/test/TestEnv.h" #include "graph/test/TestBase.h" #include #include "graph/GraphHttpHandler.h" #include "webservice/WebService.h" -#include "webservice/test/TestUtils.h" namespace nebula { namespace graph { @@ -45,24 +45,37 @@ class GraphHttpHandlerTest : public TestBase { TEST_F(GraphHttpHandlerTest, GraphStatusTest) { { - std::string resp; - ASSERT_TRUE(getUrl("/status", resp)); - ASSERT_EQ(std::string("status=running\n"), resp); + auto url = "/status"; + auto request = folly::stringPrintf("http://%s:%d%s", FLAGS_ws_ip.c_str(), + FLAGS_ws_http_port, url); + auto resp = http::HttpClient::get(request); + ASSERT_TRUE(resp.ok()); + ASSERT_EQ("status=running\n", resp.value()); } { - std::string resp; - ASSERT_TRUE(getUrl("", resp)); - ASSERT_EQ(std::string("status=running\n"), resp); + auto url = ""; + auto request = folly::stringPrintf("http://%s:%d%s", FLAGS_ws_ip.c_str(), + FLAGS_ws_http_port, url); + auto resp = http::HttpClient::get(request); + ASSERT_TRUE(resp.ok()); + ASSERT_EQ("status=running\n", resp.value()); } { - std::string resp; - ASSERT_TRUE(getUrl("/status?daemon=status", resp)); - ASSERT_EQ(std::string("status=running\n"), resp); + auto url = "/status?daemon=status"; + auto request = folly::stringPrintf("http://%s:%d%s", FLAGS_ws_ip.c_str(), + FLAGS_ws_http_port, url); + auto resp = http::HttpClient::get(request); + ASSERT_TRUE(resp.ok()); + ASSERT_EQ("status=running\n", resp.value()); } { - std::string resp; - ASSERT_TRUE(getUrl("/status?daemon=status&returnjson", resp)); - auto json = folly::parseJson(resp); + auto url = "/status?daemon=status&returnjson"; + auto request = folly::stringPrintf("http://%s:%d%s", FLAGS_ws_ip.c_str(), + FLAGS_ws_http_port, url); + auto resp = http::HttpClient::get(request); + ASSERT_TRUE(resp.ok()); + + auto json = folly::parseJson(resp.value()); ASSERT_TRUE(json.isArray()); ASSERT_EQ(1UL, json.size()); ASSERT_TRUE(json[0].isObject()); @@ -78,11 +91,13 @@ TEST_F(GraphHttpHandlerTest, GraphStatusTest) { ASSERT_TRUE(it->second.isString()); ASSERT_EQ("running", it->second.getString()); } - { - std::string resp; - ASSERT_TRUE(getUrl("/status123?daemon=status", resp)); - ASSERT_TRUE(resp.empty()); + auto url = "/status123?daemon=status"; + auto request = folly::stringPrintf("http://%s:%d%s", FLAGS_ws_ip.c_str(), + FLAGS_ws_http_port, url); + auto resp = http::HttpClient::get(request); + ASSERT_TRUE(resp.ok()); + ASSERT_TRUE(resp.value().empty()); } } diff --git a/src/kvstore/KVStore.h b/src/kvstore/KVStore.h index e7dc7c31b0f..83be9e423e0 100644 --- a/src/kvstore/KVStore.h +++ b/src/kvstore/KVStore.h @@ -136,6 +136,8 @@ class KVStore { const std::string& prefix, KVCallback cb) = 0; + virtual ResultCode ingest(GraphSpaceID spaceId) = 0; + virtual ErrorOr> part(GraphSpaceID spaceId, PartitionID partId) = 0; diff --git a/src/kvstore/NebulaStore.cpp b/src/kvstore/NebulaStore.cpp index a93020606c6..2213b11fba8 100644 --- a/src/kvstore/NebulaStore.cpp +++ b/src/kvstore/NebulaStore.cpp @@ -409,9 +409,7 @@ ErrorOr> NebulaStore::part(GraphSpaceID spaceI } -ResultCode NebulaStore::ingest(GraphSpaceID spaceId, - const std::string& extra, - const std::vector& files) { +ResultCode NebulaStore::ingest(GraphSpaceID spaceId) { auto spaceRet = space(spaceId); if (!ok(spaceRet)) { return error(spaceRet); @@ -421,19 +419,29 @@ ResultCode NebulaStore::ingest(GraphSpaceID spaceId, auto parts = engine->allParts(); std::vector extras; for (auto part : parts) { + auto ret = this->engine(spaceId, part); + if (!ok(ret)) { + return error(ret); + } + + auto path = value(ret)->getDataRoot(); + LOG(INFO) << "Ingesting Part " << part; + if (!fs::FileUtils::exist(path)) { + LOG(ERROR) << path << " not existed"; + return ResultCode::ERR_IO_ERROR; + } + + auto files = nebula::fs::FileUtils::listAllFilesInDir(path, true, "*.sst"); for (auto file : files) { - auto extraPath = folly::stringPrintf("%s/nebula/%d/%d/%s", - extra.c_str(), - spaceId, - part, - file.c_str()); - LOG(INFO) << "Loading extra path : " << extraPath; - extras.emplace_back(std::move(extraPath)); + VLOG(3) << "Ingesting extra file: " << file; + extras.emplace_back(file); } } - auto code = engine->ingest(std::move(extras)); - if (code != ResultCode::SUCCEEDED) { - return code; + if (extras.size() != 0) { + auto code = engine->ingest(std::move(extras)); + if (code != ResultCode::SUCCEEDED) { + return code; + } } } return ResultCode::SUCCEEDED; diff --git a/src/kvstore/NebulaStore.h b/src/kvstore/NebulaStore.h index 5298077d060..4939a7086e4 100644 --- a/src/kvstore/NebulaStore.h +++ b/src/kvstore/NebulaStore.h @@ -142,9 +142,7 @@ class NebulaStore : public KVStore, public Handler { ErrorOr> part(GraphSpaceID spaceId, PartitionID partId) override; - ResultCode ingest(GraphSpaceID spaceId, - const std::string& extra, - const std::vector& files); + ResultCode ingest(GraphSpaceID spaceId) override; ResultCode setOption(GraphSpaceID spaceId, const std::string& configKey, diff --git a/src/kvstore/plugins/hbase/HBaseStore.cpp b/src/kvstore/plugins/hbase/HBaseStore.cpp index bb39086aaba..060c885b3af 100644 --- a/src/kvstore/plugins/hbase/HBaseStore.cpp +++ b/src/kvstore/plugins/hbase/HBaseStore.cpp @@ -379,6 +379,10 @@ void HBaseStore::asyncRemovePrefix(GraphSpaceID spaceId, return cb(removePrefix()); } +ResultCode HBaseStore::ingest(GraphSpaceID spaceId) { + UNUSED(spaceId); + return ResultCode::ERR_UNSUPPORTED; +} } // namespace kvstore } // namespace nebula diff --git a/src/kvstore/plugins/hbase/HBaseStore.h b/src/kvstore/plugins/hbase/HBaseStore.h index ad7bbf52fdf..279df3d365d 100644 --- a/src/kvstore/plugins/hbase/HBaseStore.h +++ b/src/kvstore/plugins/hbase/HBaseStore.h @@ -126,6 +126,8 @@ class HBaseStore : public KVStore { const std::string& prefix, KVCallback cb) override; + ResultCode ingest(GraphSpaceID spaceId) override; + ErrorOr> part(GraphSpaceID, PartitionID) override { return ResultCode::ERR_UNSUPPORTED; diff --git a/src/kvstore/test/NebulaStoreTest.cpp b/src/kvstore/test/NebulaStoreTest.cpp index e134cdefc9c..f8bde318a97 100644 --- a/src/kvstore/test/NebulaStoreTest.cpp +++ b/src/kvstore/test/NebulaStoreTest.cpp @@ -104,7 +104,7 @@ TEST(NebulaStoreTest, SimpleTest) { folly::stringPrintf("val_%d", i)); } folly::Baton baton; - store->asyncMultiPut(1, 1, std::move(data), [&] (ResultCode code){ + store->asyncMultiPut(1, 1, std::move(data), [&] (ResultCode code) { EXPECT_EQ(ResultCode::SUCCEEDED, code); baton.post(); }); @@ -334,7 +334,7 @@ TEST(NebulaStoreTest, ThreeCopiesTest) { auto index = findStoreIndex(leader); { folly::Baton baton; - stores[index]->asyncMultiPut(0, part, std::move(data), [&baton](ResultCode code){ + stores[index]->asyncMultiPut(0, part, std::move(data), [&baton](ResultCode code) { EXPECT_EQ(ResultCode::SUCCEEDED, code); baton.post(); }); diff --git a/src/meta/CMakeLists.txt b/src/meta/CMakeLists.txt index db947f8e8bc..6f2426108b0 100644 --- a/src/meta/CMakeLists.txt +++ b/src/meta/CMakeLists.txt @@ -49,6 +49,7 @@ nebula_add_library( nebula_add_library( meta_http_handler OBJECT + MetaHttpIngestHandler.cpp MetaHttpDownloadHandler.cpp MetaHttpStatusHandler.cpp ) diff --git a/src/meta/MetaHttpDownloadHandler.cpp b/src/meta/MetaHttpDownloadHandler.cpp index 0ec9cbf2061..994a674355b 100644 --- a/src/meta/MetaHttpDownloadHandler.cpp +++ b/src/meta/MetaHttpDownloadHandler.cpp @@ -10,15 +10,16 @@ #include "meta/MetaHttpDownloadHandler.h" #include "meta/MetaServiceUtils.h" #include "webservice/Common.h" +#include "webservice/WebService.h" #include "network/NetworkUtils.h" #include "hdfs/HdfsHelper.h" +#include "http/HttpClient.h" #include "process/ProcessUtils.h" +#include "thread/GenericThreadPool.h" #include #include #include -DEFINE_int32(storage_http_port, 12000, "Storage daemon's http port"); - namespace nebula { namespace meta { @@ -29,11 +30,14 @@ using proxygen::UpgradeProtocol; using proxygen::ResponseBuilder; void MetaHttpDownloadHandler::init(nebula::kvstore::KVStore *kvstore, - nebula::hdfs::HdfsHelper *helper) { + nebula::hdfs::HdfsHelper *helper, + nebula::thread::GenericThreadPool *pool) { kvstore_ = kvstore; helper_ = helper; + pool_ = pool; CHECK_NOTNULL(kvstore_); CHECK_NOTNULL(helper_); + CHECK_NOTNULL(pool_); } void MetaHttpDownloadHandler::onRequest(std::unique_ptr headers) noexcept { @@ -46,8 +50,8 @@ void MetaHttpDownloadHandler::onRequest(std::unique_ptr headers) no if (!headers->hasQueryParam("host") || !headers->hasQueryParam("port") || !headers->hasQueryParam("path") || - !headers->hasQueryParam("local") || !headers->hasQueryParam("space")) { + LOG(INFO) << "Illegal Argument"; err_ = HttpCode::E_ILLEGAL_ARGUMENT; return; } @@ -55,7 +59,6 @@ void MetaHttpDownloadHandler::onRequest(std::unique_ptr headers) no hdfsHost_ = headers->getQueryParam("host"); hdfsPort_ = headers->getIntQueryParam("port"); hdfsPath_ = headers->getQueryParam("path"); - localPath_ = headers->getQueryParam("local"); spaceID_ = headers->getIntQueryParam("space"); } @@ -69,12 +72,14 @@ void MetaHttpDownloadHandler::onEOM() noexcept { switch (err_) { case HttpCode::E_UNSUPPORTED_METHOD: ResponseBuilder(downstream_) - .status(405, "Method Not Allowed") + .status(WebServiceUtils::to(HttpStatusCode::METHOD_NOT_ALLOWED), + WebServiceUtils::toString(HttpStatusCode::METHOD_NOT_ALLOWED)) .sendWithEOM(); return; case HttpCode::E_ILLEGAL_ARGUMENT: ResponseBuilder(downstream_) - .status(400, "Bad Request") + .status(WebServiceUtils::to(HttpStatusCode::BAD_REQUEST), + WebServiceUtils::toString(HttpStatusCode::BAD_REQUEST)) .sendWithEOM(); return; default: @@ -82,22 +87,25 @@ void MetaHttpDownloadHandler::onEOM() noexcept { } if (helper_->checkHadoopPath()) { - if (dispatchSSTFiles(hdfsHost_, hdfsPort_, hdfsPath_, localPath_)) { + if (dispatchSSTFiles(hdfsHost_, hdfsPort_, hdfsPath_)) { ResponseBuilder(downstream_) - .status(200, "SSTFile dispatch successfully") + .status(WebServiceUtils::to(HttpStatusCode::OK), + WebServiceUtils::toString(HttpStatusCode::OK)) .body("SSTFile dispatch successfully") .sendWithEOM(); } else { LOG(ERROR) << "SSTFile dispatch failed"; ResponseBuilder(downstream_) - .status(404, "SSTFile dispatch failed") + .status(WebServiceUtils::to(HttpStatusCode::FORBIDDEN), + WebServiceUtils::toString(HttpStatusCode::FORBIDDEN)) .body("SSTFile dispatch failed") .sendWithEOM(); } } else { LOG(ERROR) << "Hadoop Home not exist"; ResponseBuilder(downstream_) - .status(404, "HADOOP_HOME not exist") + .status(WebServiceUtils::to(HttpStatusCode::NOT_FOUND), + WebServiceUtils::toString(HttpStatusCode::NOT_FOUND)) .sendWithEOM(); } } @@ -120,8 +128,7 @@ void MetaHttpDownloadHandler::onError(ProxygenError error) noexcept { bool MetaHttpDownloadHandler::dispatchSSTFiles(const std::string& hdfsHost, int hdfsPort, - const std::string& hdfsPath, - const std::string& localPath) { + const std::string& hdfsPath) { auto result = helper_->ls(hdfsHost, hdfsPort, hdfsPath); if (!result.ok()) { LOG(ERROR) << "Dispatch SSTFile Failed"; @@ -164,34 +171,43 @@ bool MetaHttpDownloadHandler::dispatchSSTFiles(const std::string& hdfsHost, return false; } - std::atomic completed(0); - std::vector threads; + std::vector> futures; + for (auto &pair : hostPartition) { std::string partsStr; folly::join(",", pair.second, partsStr); auto storageIP = network::NetworkUtils::intToIPv4(pair.first.first); - threads.push_back(std::thread([storageIP, hdfsHost, hdfsPort, hdfsPath, - partsStr, localPath, &completed]() { - auto tmp = "http://%s:%d/download?host=%s&port=%d&path=%s&parts=%s&local=%s"; - auto url = folly::stringPrintf(tmp, storageIP.c_str(), FLAGS_storage_http_port, - hdfsHost.c_str(), hdfsPort, hdfsPath.c_str(), - partsStr.c_str(), localPath.c_str()); - auto command = folly::stringPrintf("/usr/bin/curl -G \"%s\"", url.c_str()); - LOG(INFO) << "Command: " << command; - auto downloadResult = ProcessUtils::runCommand(command.c_str()); - if (!downloadResult.ok() || downloadResult.value() != "SSTFile download successfully") { - LOG(ERROR) << "Failed to download SST Files: " << downloadResult.value(); - } else { - completed++; - } - })); + auto dispatcher = [storageIP, hdfsHost, hdfsPort, hdfsPath, partsStr, this]() { + static const char *tmp = "http://%s:%d/%s?host=%s&port=%d&path=%s&parts=%s&space=%d"; + std::string url = folly::stringPrintf(tmp, storageIP.c_str(), + FLAGS_ws_storage_http_port, "download", + hdfsHost.c_str(), hdfsPort, hdfsPath.c_str(), + partsStr.c_str(), spaceID_); + auto downloadResult = nebula::http::HttpClient::get(url); + return downloadResult.ok() && downloadResult.value() == "SSTFile download successfully"; + }; + auto future = pool_->addTask(dispatcher); + futures.push_back(std::move(future)); } - for (auto &thread : threads) { - thread.join(); - } - return completed == (int32_t)hostPartition.size(); + bool successfully{true}; + folly::collectAll(std::move(futures)).then([&](const std::vector>& tries) { + for (const auto& t : tries) { + if (t.hasException()) { + LOG(ERROR) << "Download Failed: " << t.exception(); + successfully = false; + break; + } + if (!t.value()) { + successfully = false; + break; + } + } + }).wait(); + + LOG(INFO) << "Download tasks have finished"; + return successfully; } } // namespace meta diff --git a/src/meta/MetaHttpDownloadHandler.h b/src/meta/MetaHttpDownloadHandler.h index 215fd491609..3606acadd51 100644 --- a/src/meta/MetaHttpDownloadHandler.h +++ b/src/meta/MetaHttpDownloadHandler.h @@ -9,9 +9,10 @@ #include "base/Base.h" #include "webservice/Common.h" -#include "proxygen/httpserver/RequestHandler.h" #include "kvstore/KVStore.h" #include "hdfs/HdfsHelper.h" +#include "thread/GenericThreadPool.h" +#include namespace nebula { namespace meta { @@ -22,7 +23,9 @@ class MetaHttpDownloadHandler : public proxygen::RequestHandler { public: MetaHttpDownloadHandler() = default; - void init(nebula::kvstore::KVStore *kvstore, nebula::hdfs::HdfsHelper *helper); + void init(nebula::kvstore::KVStore *kvstore, + nebula::hdfs::HdfsHelper *helper, + nebula::thread::GenericThreadPool *pool); void onRequest(std::unique_ptr headers) noexcept override; @@ -39,18 +42,17 @@ class MetaHttpDownloadHandler : public proxygen::RequestHandler { private: bool dispatchSSTFiles(const std::string& host, int32_t port, - const std::string& path, - const std::string& local); + const std::string& path); private: HttpCode err_{HttpCode::SUCCEEDED}; std::string hdfsHost_; int32_t hdfsPort_; std::string hdfsPath_; - std::string localPath_; GraphSpaceID spaceID_; nebula::kvstore::KVStore *kvstore_; nebula::hdfs::HdfsHelper *helper_; + nebula::thread::GenericThreadPool *pool_; }; } // namespace meta diff --git a/src/meta/MetaHttpIngestHandler.cpp b/src/meta/MetaHttpIngestHandler.cpp new file mode 100644 index 00000000000..ad31d684bd5 --- /dev/null +++ b/src/meta/MetaHttpIngestHandler.cpp @@ -0,0 +1,164 @@ +/* Copyright (c) 2019 vesoft inc. All rights reserved. + * + * This source code is licensed under Apache 2.0 License, + * attached with Common Clause Condition 1.0, found in the LICENSES directory. + */ + +#include "meta/MetaServiceUtils.h" +#include "meta/MetaHttpIngestHandler.h" +#include "webservice/Common.h" +#include "webservice/WebService.h" +#include "network/NetworkUtils.h" +#include "http/HttpClient.h" +#include "process/ProcessUtils.h" +#include "thread/GenericThreadPool.h" +#include +#include +#include + +DEFINE_int32(meta_ingest_thread_num, 3, "Meta daemon's ingest thread number"); + +namespace nebula { +namespace meta { + +using proxygen::HTTPMessage; +using proxygen::HTTPMethod; +using proxygen::ProxygenError; +using proxygen::UpgradeProtocol; +using proxygen::ResponseBuilder; + +void MetaHttpIngestHandler::init(nebula::kvstore::KVStore *kvstore, + nebula::thread::GenericThreadPool *pool) { + kvstore_ = kvstore; + pool_ = pool; + CHECK_NOTNULL(kvstore_); + CHECK_NOTNULL(pool_); +} + +void MetaHttpIngestHandler::onRequest(std::unique_ptr headers) noexcept { + if (headers->getMethod().value() != HTTPMethod::GET) { + // Unsupported method + err_ = HttpCode::E_UNSUPPORTED_METHOD; + return; + } + + if (!headers->hasQueryParam("space")) { + err_ = HttpCode::E_ILLEGAL_ARGUMENT; + return; + } + + space_ = headers->getIntQueryParam("space"); +} + +void MetaHttpIngestHandler::onBody(std::unique_ptr) noexcept { + // Do nothing, we only support GET +} + + +void MetaHttpIngestHandler::onEOM() noexcept { + switch (err_) { + case HttpCode::E_UNSUPPORTED_METHOD: + ResponseBuilder(downstream_) + .status(WebServiceUtils::to(HttpStatusCode::METHOD_NOT_ALLOWED), + WebServiceUtils::toString(HttpStatusCode::METHOD_NOT_ALLOWED)) + .sendWithEOM(); + return; + case HttpCode::E_ILLEGAL_ARGUMENT: + ResponseBuilder(downstream_) + .status(WebServiceUtils::to(HttpStatusCode::BAD_REQUEST), + WebServiceUtils::toString(HttpStatusCode::BAD_REQUEST)) + .sendWithEOM(); + return; + default: + break; + } + + if (ingestSSTFiles(space_)) { + LOG(INFO) << "SSTFile ingest successfully "; + ResponseBuilder(downstream_) + .status(WebServiceUtils::to(HttpStatusCode::OK), + WebServiceUtils::toString(HttpStatusCode::OK)) + .body("SSTFile ingest successfully") + .sendWithEOM(); + } else { + LOG(ERROR) << "SSTFile ingest failed"; + ResponseBuilder(downstream_) + .status(WebServiceUtils::to(HttpStatusCode::FORBIDDEN), + WebServiceUtils::toString(HttpStatusCode::FORBIDDEN)) + .body("SSTFile ingest failed") + .sendWithEOM(); + } +} + +void MetaHttpIngestHandler::onUpgrade(UpgradeProtocol) noexcept { + // Do nothing +} + + +void MetaHttpIngestHandler::requestComplete() noexcept { + delete this; +} + + +void MetaHttpIngestHandler::onError(ProxygenError error) noexcept { + LOG(ERROR) << "Web Service MetaHttpIngestHandler got error : " + << proxygen::getErrorString(error); +} + +bool MetaHttpIngestHandler::ingestSSTFiles(GraphSpaceID space) { + std::unique_ptr iter; + auto prefix = MetaServiceUtils::partPrefix(space); + + static const GraphSpaceID metaSpaceId = 0; + static const PartitionID metaPartId = 0; + auto ret = kvstore_->prefix(metaSpaceId, metaPartId, prefix, &iter); + if (ret != kvstore::ResultCode::SUCCEEDED) { + LOG(ERROR) << "Fetch Parts Failed"; + return false; + } + + std::set storageIPs; + while (iter->valid()) { + for (auto &host : MetaServiceUtils::parsePartVal(iter->val())) { + auto storageIP = network::NetworkUtils::intToIPv4(host.get_ip()); + if (std::find(storageIPs.begin(), storageIPs.end(), storageIP) == storageIPs.end()) { + storageIPs.insert(std::move(storageIP)); + } + } + iter->next(); + } + + std::vector> futures; + + for (auto &storageIP : storageIPs) { + auto dispatcher = [storageIP, space]() { + static const char *tmp = "http://%s:%d/ingest?space=%d"; + auto url = folly::stringPrintf(tmp, storageIP.c_str(), + FLAGS_ws_storage_http_port, space); + auto ingestResult = nebula::http::HttpClient::get(url); + return ingestResult.ok() && ingestResult.value() == "SSTFile ingest successfully"; + }; + auto future = pool_->addTask(dispatcher); + futures.push_back(std::move(future)); + } + + bool successfully{true}; + folly::collectAll(std::move(futures)).then([&](const std::vector>& tries) { + for (const auto& t : tries) { + if (t.hasException()) { + LOG(ERROR) << "Ingest Failed: " << t.exception(); + successfully = false; + break; + } + if (!t.value()) { + successfully = false; + break; + } + } + }).wait(); + LOG(INFO) << "Ingest tasks have finished"; + return successfully; +} + +} // namespace meta +} // namespace nebula diff --git a/src/meta/MetaHttpIngestHandler.h b/src/meta/MetaHttpIngestHandler.h new file mode 100644 index 00000000000..473c0fc313e --- /dev/null +++ b/src/meta/MetaHttpIngestHandler.h @@ -0,0 +1,53 @@ +/* Copyright (c) 2019 vesoft inc. All rights reserved. + * + * This source code is licensed under Apache 2.0 License, + * attached with Common Clause Condition 1.0, found in the LICENSES directory. + */ + +#ifndef META_METAHTTPINGESTHANDLER_H +#define META_METAHTTPINGESTHANDLER_H + +#include "base/Base.h" +#include "webservice/Common.h" +#include "kvstore/KVStore.h" +#include "thread/GenericThreadPool.h" +#include + +namespace nebula { +namespace meta { + +using nebula::HttpCode; + +class MetaHttpIngestHandler : public proxygen::RequestHandler { +public: + MetaHttpIngestHandler() = default; + + void init(nebula::kvstore::KVStore *kvstore, + nebula::thread::GenericThreadPool *pool); + + void onRequest(std::unique_ptr headers) noexcept override; + + void onBody(std::unique_ptr body) noexcept override; + + void onEOM() noexcept override; + + void onUpgrade(proxygen::UpgradeProtocol protocol) noexcept override; + + void requestComplete() noexcept override; + + void onError(proxygen::ProxygenError error) noexcept override; + + bool ingestSSTFiles(GraphSpaceID space); + +private: + HttpCode err_{HttpCode::SUCCEEDED}; + GraphSpaceID space_; + nebula::kvstore::KVStore *kvstore_; + nebula::thread::GenericThreadPool *pool_; +}; + +} // namespace meta +} // namespace nebula + +#endif // META_METAHTTPINGESTHANDLER_H + diff --git a/src/meta/MetaHttpStatusHandler.cpp b/src/meta/MetaHttpStatusHandler.cpp index 131699bfaa8..20a6976bc0d 100644 --- a/src/meta/MetaHttpStatusHandler.cpp +++ b/src/meta/MetaHttpStatusHandler.cpp @@ -52,7 +52,8 @@ void MetaHttpStatusHandler::onEOM() noexcept { switch (err_) { case HttpCode::E_UNSUPPORTED_METHOD: ResponseBuilder(downstream_) - .status(405, "Method Not Allowed") + .status(WebServiceUtils::to(HttpStatusCode::METHOD_NOT_ALLOWED), + WebServiceUtils::toString(HttpStatusCode::METHOD_NOT_ALLOWED)) .sendWithEOM(); return; default: @@ -62,12 +63,14 @@ void MetaHttpStatusHandler::onEOM() noexcept { folly::dynamic vals = getStatus(); if (returnJson_) { ResponseBuilder(downstream_) - .status(200, "OK") + .status(WebServiceUtils::to(HttpStatusCode::OK), + WebServiceUtils::toString(HttpStatusCode::OK)) .body(folly::toJson(vals)) .sendWithEOM(); } else { ResponseBuilder(downstream_) - .status(200, "OK") + .status(WebServiceUtils::to(HttpStatusCode::OK), + WebServiceUtils::toString(HttpStatusCode::OK)) .body(toStr(vals)) .sendWithEOM(); } diff --git a/src/meta/MetaHttpStatusHandler.h b/src/meta/MetaHttpStatusHandler.h index e80956cdaef..46fba4dfa36 100644 --- a/src/meta/MetaHttpStatusHandler.h +++ b/src/meta/MetaHttpStatusHandler.h @@ -9,8 +9,8 @@ #include "base/Base.h" #include "webservice/Common.h" -#include "proxygen/httpserver/RequestHandler.h" #include "kvstore/KVStore.h" +#include namespace nebula { namespace meta { diff --git a/src/meta/test/CMakeLists.txt b/src/meta/test/CMakeLists.txt index 90398ed0376..5f7753c812d 100644 --- a/src/meta/test/CMakeLists.txt +++ b/src/meta/test/CMakeLists.txt @@ -222,7 +222,9 @@ nebula_add_test( $ $ $ + $ $ + $ $ $ $ @@ -240,7 +242,6 @@ nebula_add_test( gtest ) - nebula_add_test( NAME meta_http_download_test @@ -260,7 +261,9 @@ nebula_add_test( $ $ $ + $ $ + $ $ $ $ @@ -278,6 +281,44 @@ nebula_add_test( gtest ) +nebula_add_test( + NAME + meta_http_ingest_test + SOURCES + MetaHttpIngestHandlerTest.cpp + OBJECTS + $ + $ + $ + $ + $ + $ + $ + $ + $ + $ + $ + $ + $ + $ + $ + $ + $ + $ + $ + $ + $ + $ + $ + $ + LIBRARIES + proxygenhttpserver + proxygenlib + ${ROCKSDB_LIBRARIES} + ${THRIFT_LIBRARIES} + wangle + gtest +) nebula_add_test( NAME balancer_test diff --git a/src/meta/test/MetaHttpDownloadHandlerTest.cpp b/src/meta/test/MetaHttpDownloadHandlerTest.cpp index 00bcb6f35e7..e7828b028e5 100644 --- a/src/meta/test/MetaHttpDownloadHandlerTest.cpp +++ b/src/meta/test/MetaHttpDownloadHandlerTest.cpp @@ -6,8 +6,8 @@ #include "base/Base.h" #include +#include "http/HttpClient.h" #include "webservice/WebService.h" -#include "webservice/test/TestUtils.h" #include "meta/MetaHttpDownloadHandler.h" #include "meta/test/MockHdfsHelper.h" #include "meta/test/TestUtils.h" @@ -34,14 +34,18 @@ class MetaHttpDownloadHandlerTestEnv : public ::testing::Environment { TestUtils::createSomeHosts(kv_.get()); TestUtils::assembleSpace(kv_.get(), 1, 2); + pool_ = std::make_unique(); + pool_->start(3); + WebService::registerHandler("/download-dispatch", [this] { auto handler = new meta::MetaHttpDownloadHandler(); - handler->init(kv_.get(), helper.get()); + handler->init(kv_.get(), helper.get(), pool_.get()); return handler; }); WebService::registerHandler("/download", [this] { auto handler = new storage::StorageHttpDownloadHandler(); - handler->init(helper.get()); + std::vector paths{rootPath_->path()}; + handler->init(helper.get(), pool_.get(), kv_.get(), paths); return handler; }); auto status = WebService::start(); @@ -52,32 +56,41 @@ class MetaHttpDownloadHandlerTestEnv : public ::testing::Environment { kv_.reset(); rootPath_.reset(); WebService::stop(); + pool_->stop(); VLOG(1) << "Web service stopped"; } private: std::unique_ptr rootPath_; std::unique_ptr kv_; + std::unique_ptr pool_; }; TEST(MetaHttpDownloadHandlerTest, MetaDownloadTest) { { - std::string resp; - ASSERT_TRUE(getUrl("/download-dispatch", resp)); - ASSERT_TRUE(resp.empty()); + auto url = "/download-dispatch"; + auto request = folly::stringPrintf("http://%s:%d%s", FLAGS_ws_ip.c_str(), + FLAGS_ws_http_port, url); + auto resp = http::HttpClient::get(request); + ASSERT_TRUE(resp.ok()); + ASSERT_TRUE(resp.value().empty()); } { - auto url = "/download-dispatch?host=127.0.0.1&port=9000&path=/data&local=/tmp&space=1"; - std::string resp; - ASSERT_TRUE(getUrl(url, resp)); - ASSERT_EQ("SSTFile dispatch successfully", resp); + auto url = "/download-dispatch?host=127.0.0.1&port=9000&path=/data&space=1"; + auto request = folly::stringPrintf("http://%s:%d%s", FLAGS_ws_ip.c_str(), + FLAGS_ws_http_port, url); + auto resp = http::HttpClient::get(request); + ASSERT_TRUE(resp.ok()); + ASSERT_EQ("SSTFile dispatch successfully", resp.value()); } { helper = std::make_unique(); - auto url = "/download-dispatch?host=127.0.0.1&port=9000&path=/data&local=/tmp&space=1"; - std::string resp; - ASSERT_TRUE(getUrl(url, resp)); - ASSERT_EQ("SSTFile dispatch failed", resp); + auto url = "/download-dispatch?host=127.0.0.1&port=9000&path=/data&space=1"; + auto request = folly::stringPrintf("http://%s:%d%s", FLAGS_ws_ip.c_str(), + FLAGS_ws_http_port, url); + auto resp = http::HttpClient::get(request); + ASSERT_TRUE(resp.ok()); + ASSERT_EQ("SSTFile dispatch failed", resp.value()); } } diff --git a/src/meta/test/MetaHttpIngestHandlerTest.cpp b/src/meta/test/MetaHttpIngestHandlerTest.cpp new file mode 100644 index 00000000000..7653607abad --- /dev/null +++ b/src/meta/test/MetaHttpIngestHandlerTest.cpp @@ -0,0 +1,114 @@ +/* Copyright (c) 2019 vesoft inc. All rights reserved. + * + * This source code is licensed under Apache 2.0 License, + * attached with Common Clause Condition 1.0, found in the LICENSES directory. + */ + +#include "base/Base.h" +#include +#include "http/HttpClient.h" +#include "meta/MetaHttpIngestHandler.h" +#include "meta/test/TestUtils.h" +#include "storage/StorageHttpIngestHandler.h" +#include "webservice/WebService.h" +#include "fs/TempDir.h" +#include +#include "thread/GenericThreadPool.h" + +namespace nebula { +namespace meta { + +class MetaHttpIngestHandlerTestEnv : public ::testing::Environment { +public: + void SetUp() override { + FLAGS_ws_http_port = 12000; + FLAGS_ws_h2_port = 0; + VLOG(1) << "Starting web service..."; + + rootPath_ = std::make_unique("/tmp/MetaHttpIngestHandler.XXXXXX"); + kv_ = TestUtils::initKV(rootPath_->path()); + TestUtils::createSomeHosts(kv_.get()); + TestUtils::assembleSpace(kv_.get(), 1, 1); + pool_ = std::make_unique(); + pool_->start(1); + + WebService::registerHandler("/ingest-dispatch", [this] { + auto handler = new meta::MetaHttpIngestHandler(); + handler->init(kv_.get(), pool_.get()); + return handler; + }); + WebService::registerHandler("/ingest", [this] { + auto handler = new storage::StorageHttpIngestHandler(); + handler->init(kv_.get()); + return handler; + }); + auto status = WebService::start(); + ASSERT_TRUE(status.ok()) << status; + } + + void TearDown() override { + kv_.reset(); + rootPath_.reset(); + WebService::stop(); + pool_->stop(); + VLOG(1) << "Web service stopped"; + } + +private: + std::unique_ptr rootPath_; + std::unique_ptr kv_; + std::unique_ptr pool_; +}; + +TEST(MetaHttpIngestHandlerTest, MetaIngestTest) { + auto path = "/tmp/MetaHttpIngestData.XXXXXX"; + std::unique_ptr externalPath = std::make_unique(path); + auto partPath = folly::stringPrintf("%s/nebula/1/download/1", externalPath->path()); + ASSERT_TRUE(nebula::fs::FileUtils::makeDir(partPath)); + + auto options = rocksdb::Options(); + auto env = rocksdb::EnvOptions(); + rocksdb::SstFileWriter writer{env, options}; + auto sstPath = folly::stringPrintf("%s/data.sst", partPath.c_str()); + auto status = writer.Open(sstPath); + ASSERT_EQ(rocksdb::Status::OK(), status); + + for (auto i = 0; i < 10; i++) { + status = writer.Put(folly::stringPrintf("key_%d", i), + folly::stringPrintf("val_%d", i)); + ASSERT_EQ(rocksdb::Status::OK(), status); + } + status = writer.Finish(); + ASSERT_EQ(rocksdb::Status::OK(), status); + + { + auto url = "/ingest-dispatch"; + auto request = folly::stringPrintf("http://%s:%d%s", FLAGS_ws_ip.c_str(), + FLAGS_ws_http_port, url); + auto resp = http::HttpClient::get(request); + ASSERT_TRUE(resp.ok()); + ASSERT_TRUE(resp.value().empty()); + } + { + auto url = "/ingest-dispatch?space=0"; + auto request = folly::stringPrintf("http://%s:%d%s", FLAGS_ws_ip.c_str(), + FLAGS_ws_http_port, url); + auto resp = http::HttpClient::get(request); + ASSERT_TRUE(resp.ok()); + ASSERT_EQ("SSTFile ingest successfully", resp.value()); + } +} + +} // namespace meta +} // namespace nebula + +int main(int argc, char** argv) { + ::testing::InitGoogleTest(&argc, argv); + folly::init(&argc, &argv, true); + google::SetStderrLogging(google::INFO); + + ::testing::AddGlobalTestEnvironment(new nebula::meta::MetaHttpIngestHandlerTestEnv()); + + return RUN_ALL_TESTS(); +} + diff --git a/src/meta/test/MetaHttpStatusHandlerTest.cpp b/src/meta/test/MetaHttpStatusHandlerTest.cpp index ce84c13645d..830ed25c3d3 100644 --- a/src/meta/test/MetaHttpStatusHandlerTest.cpp +++ b/src/meta/test/MetaHttpStatusHandlerTest.cpp @@ -7,8 +7,8 @@ #include "base/Base.h" #include #include +#include "http/HttpClient.h" #include "webservice/WebService.h" -#include "webservice/test/TestUtils.h" #include "meta/MetaHttpStatusHandler.h" #include "meta/test/TestUtils.h" #include "fs/TempDir.h" @@ -43,24 +43,37 @@ class MetaHttpStatusHandlerTestEnv : public ::testing::Environment { TEST(MetaHttpStatusHandlerTest, MetaStatusTest) { { - std::string resp; - ASSERT_TRUE(getUrl("/status", resp)); - ASSERT_EQ(std::string("status=running\n"), resp); + auto url = "/status"; + auto request = folly::stringPrintf("http://%s:%d%s", FLAGS_ws_ip.c_str(), + FLAGS_ws_http_port, url); + auto resp = http::HttpClient::get(request); + ASSERT_TRUE(resp.ok()); + ASSERT_EQ("status=running\n", resp.value()); } { - std::string resp; - ASSERT_TRUE(getUrl("", resp)); - ASSERT_EQ(std::string("status=running\n"), resp); + auto url = ""; + auto request = folly::stringPrintf("http://%s:%d%s", FLAGS_ws_ip.c_str(), + FLAGS_ws_http_port, url); + auto resp = http::HttpClient::get(request); + ASSERT_TRUE(resp.ok()); + ASSERT_EQ("status=running\n", resp.value()); } { - std::string resp; - ASSERT_TRUE(getUrl("/status?daemon=status", resp)); - ASSERT_EQ(std::string("status=running\n"), resp); + auto url = "/status?daemon=status"; + auto request = folly::stringPrintf("http://%s:%d%s", FLAGS_ws_ip.c_str(), + FLAGS_ws_http_port, url); + auto resp = http::HttpClient::get(request); + ASSERT_TRUE(resp.ok()); + ASSERT_EQ("status=running\n", resp.value()); } { - std::string resp; - ASSERT_TRUE(getUrl("/status?daemon=status&returnjson", resp)); - auto json = folly::parseJson(resp); + auto url = "/status?daemon=status&returnjson"; + auto request = folly::stringPrintf("http://%s:%d%s", FLAGS_ws_ip.c_str(), + FLAGS_ws_http_port, url); + auto resp = http::HttpClient::get(request); + ASSERT_TRUE(resp.ok()); + + auto json = folly::parseJson(resp.value()); ASSERT_TRUE(json.isArray()); ASSERT_EQ(1UL, json.size()); ASSERT_TRUE(json[0].isObject()); @@ -77,9 +90,12 @@ TEST(MetaHttpStatusHandlerTest, MetaStatusTest) { ASSERT_EQ("running", it->second.getString()); } { - std::string resp; - ASSERT_TRUE(getUrl("/status123?daemon=status", resp)); - ASSERT_TRUE(resp.empty()); + auto url = "/status123?deamon=status"; + auto request = folly::stringPrintf("http://%s:%d%s", FLAGS_ws_ip.c_str(), + FLAGS_ws_http_port, url); + auto resp = http::HttpClient::get(request); + ASSERT_TRUE(resp.ok()); + ASSERT_TRUE(resp.value().empty()); } } diff --git a/src/meta/test/TestUtils.h b/src/meta/test/TestUtils.h index 9187d8d1399..ac989e966a7 100644 --- a/src/meta/test/TestUtils.h +++ b/src/meta/test/TestUtils.h @@ -44,6 +44,9 @@ class TestUtils { // 0 => {0} auto& partsMap = partMan->partsMap(); partsMap[0][0] = PartMeta(); + // 1 => {1,2} + partsMap[1][1] = PartMeta(); + partsMap[1][2] = PartMeta(); std::vector paths; paths.emplace_back(folly::stringPrintf("%s/disk1", rootPath)); diff --git a/src/parser/MutateSentences.cpp b/src/parser/MutateSentences.cpp index fa79b520055..cd6c44bc7c9 100644 --- a/src/parser/MutateSentences.cpp +++ b/src/parser/MutateSentences.cpp @@ -266,8 +266,12 @@ std::string DeleteEdgeSentence::toString() const { } std::string DownloadSentence::toString() const { - return folly::stringPrintf("DOWNLOAD HDFS \"%s:%d/%s\" TO \"%s\"", host_.get()->c_str(), - port_, path_.get()->c_str(), localPath_.get()->c_str()); + return folly::stringPrintf("DOWNLOAD HDFS \"%s:%d/%s\"", host_.get()->c_str(), + port_, path_.get()->c_str()); +} + +std::string IngestSentence::toString() const { + return "INGEST"; } } // namespace nebula diff --git a/src/parser/MutateSentences.h b/src/parser/MutateSentences.h index 1bc66480e4a..47d01717b31 100644 --- a/src/parser/MutateSentences.h +++ b/src/parser/MutateSentences.h @@ -253,6 +253,7 @@ class InsertEdgeSentence final : public Sentence { InsertEdgeSentence() { kind_ = Kind::kInsertEdge; } + void setOverwrite(bool overwritable) { overwritable_ = overwritable; } @@ -473,6 +474,7 @@ class DeleteEdgeSentence final : public Sentence { std::unique_ptr whereClause_; }; + class DownloadSentence final : public Sentence { public: DownloadSentence() { @@ -503,14 +505,6 @@ class DownloadSentence final : public Sentence { path_.reset(path); } - const std::string* localPath() const { - return localPath_.get(); - } - - void setLocalPath(std::string *localPath) { - localPath_.reset(localPath); - } - void setUrl(std::string *url) { static std::string hdfsPrefix = "hdfs://"; if (url->find(hdfsPrefix) != 0) { @@ -548,9 +542,19 @@ class DownloadSentence final : public Sentence { std::unique_ptr host_; int32_t port_; std::unique_ptr path_; - std::unique_ptr localPath_; }; + +class IngestSentence final : public Sentence { +public: + IngestSentence() { + kind_ = Kind::kIngest; + } + + std::string toString() const override; +}; + + } // namespace nebula #endif // PARSER_MUTATESENTENCES_H_ diff --git a/src/parser/parser.yy b/src/parser/parser.yy index a3edbf24ea6..f8cde663dfe 100644 --- a/src/parser/parser.yy +++ b/src/parser/parser.yy @@ -92,7 +92,7 @@ class GraphScanner; %token KW_EDGE KW_EDGES KW_UPDATE KW_STEPS KW_OVER KW_UPTO KW_REVERSELY KW_SPACE KW_DELETE KW_FIND %token KW_INT KW_BIGINT KW_DOUBLE KW_STRING KW_BOOL KW_TAG KW_TAGS KW_UNION KW_INTERSECT KW_MINUS %token KW_NO KW_OVERWRITE KW_IN KW_DESCRIBE KW_DESC KW_SHOW KW_HOSTS KW_TIMESTAMP KW_ADD -%token KW_PARTITION_NUM KW_REPLICA_FACTOR KW_DROP KW_REMOVE KW_SPACES +%token KW_PARTITION_NUM KW_REPLICA_FACTOR KW_DROP KW_REMOVE KW_SPACES KW_INGEST %token KW_IF KW_NOT KW_EXISTS KW_WITH KW_FIRSTNAME KW_LASTNAME KW_EMAIL KW_PHONE KW_USER KW_USERS %token KW_PASSWORD KW_CHANGE KW_ROLE KW_GOD KW_ADMIN KW_GUEST KW_GRANT KW_REVOKE KW_ON %token KW_ROLES KW_BY KW_DOWNLOAD KW_HDFS @@ -179,6 +179,7 @@ class GraphScanner; %type traverse_sentence set_sentence piped_sentence assignment_sentence %type maintain_sentence insert_vertex_sentence insert_edge_sentence %type mutate_sentence update_vertex_sentence update_edge_sentence delete_vertex_sentence delete_edge_sentence +%type ingest_sentence %type show_sentence add_hosts_sentence remove_hosts_sentence create_space_sentence describe_space_sentence %type drop_space_sentence %type yield_sentence @@ -1062,10 +1063,9 @@ delete_vertex_sentence ; download_sentence - : KW_DOWNLOAD KW_HDFS STRING KW_TO STRING { + : KW_DOWNLOAD KW_HDFS STRING { auto sentence = new DownloadSentence(); sentence->setUrl($3); - sentence->setLocalPath($5); $$ = sentence; } ; @@ -1089,6 +1089,13 @@ delete_edge_sentence } ; +ingest_sentence + : KW_INGEST { + auto sentence = new IngestSentence(); + $$ = sentence; + } + ; + show_sentence : KW_SHOW KW_HOSTS { $$ = new ShowSentence(ShowSentence::ShowType::kShowHosts); @@ -1377,6 +1384,7 @@ mutate_sentence | delete_vertex_sentence { $$ = $1; } | delete_edge_sentence { $$ = $1; } | download_sentence { $$ = $1; } + | ingest_sentence { $$ = $1; } ; maintain_sentence diff --git a/src/parser/scanner.lex b/src/parser/scanner.lex index fea98465c4e..4825eee32d9 100644 --- a/src/parser/scanner.lex +++ b/src/parser/scanner.lex @@ -102,6 +102,7 @@ TTL_COL ([Tt][Tt][Ll][_][Cc][Oo][Ll]) DOWNLOAD ([Dd][Oo][Ww][Nn][Ll][Oo][Aa][Dd]) HDFS ([Hh][Dd][Ff][Ss]) ORDER ([Oo][Rr][Dd][Ee][Rr]) +INGEST ([Ii][Nn][Gg][Ee][Ss][Tt]) ASC ([Aa][Ss][Cc]) DISTINCT ([Dd][Ii][Ss][Tt][Ii][Nn][Cc][Tt]) VARIABLES ([Vv][Aa][Rr][Ii][Aa][Bb][Ll][Ee][Ss]) @@ -205,6 +206,7 @@ IP_OCTET ([0-9]|[1-9][0-9]|1[0-9][0-9]|2[0-4][0-9]|25[0-5]) {TRUE} { yylval->boolval = true; return TokenType::BOOL; } {FALSE} { yylval->boolval = false; return TokenType::BOOL; } {ORDER} { return TokenType::KW_ORDER; } +{INGEST} { return TokenType::KW_INGEST; } {ASC} { return TokenType::KW_ASC; } {DISTINCT} { return TokenType::KW_DISTINCT; } diff --git a/src/parser/test/ParserTest.cpp b/src/parser/test/ParserTest.cpp index fc5540f1de9..de8f30b7807 100644 --- a/src/parser/test/ParserTest.cpp +++ b/src/parser/test/ParserTest.cpp @@ -936,10 +936,16 @@ TEST(Parser, Annotation) { } } -TEST(Parser, DownloadLoad) { +TEST(Parser, DownloadAndIngest) { { GQLParser parser; - std::string query = "DOWNLOAD HDFS \"hdfs://127.0.0.1:9090/data\" TO \"/tmp\""; + std::string query = "DOWNLOAD HDFS \"hdfs://127.0.0.1:9090/data\""; + auto result = parser.parse(query); + ASSERT_TRUE(result.ok()) << result.status(); + } + { + GQLParser parser; + std::string query = "INGEST"; auto result = parser.parse(query); ASSERT_TRUE(result.ok()) << result.status(); } diff --git a/src/parser/test/ScannerTest.cpp b/src/parser/test/ScannerTest.cpp index c514f5ad14b..7dbf0126df2 100644 --- a/src/parser/test/ScannerTest.cpp +++ b/src/parser/test/ScannerTest.cpp @@ -352,6 +352,9 @@ TEST(Scanner, Basic) { CHECK_SEMANTIC_TYPE("ASC", TokenType::KW_ASC), CHECK_SEMANTIC_TYPE("Asc", TokenType::KW_ASC), CHECK_SEMANTIC_TYPE("asc", TokenType::KW_ASC), + CHECK_SEMANTIC_TYPE("INGEST", TokenType::KW_INGEST), + CHECK_SEMANTIC_TYPE("Ingest", TokenType::KW_INGEST), + CHECK_SEMANTIC_TYPE("ingest", TokenType::KW_INGEST), CHECK_SEMANTIC_TYPE("VARIABLES", TokenType::KW_VARIABLES), CHECK_SEMANTIC_TYPE("variables", TokenType::KW_VARIABLES), CHECK_SEMANTIC_TYPE("Variables", TokenType::KW_VARIABLES), diff --git a/src/storage/CMakeLists.txt b/src/storage/CMakeLists.txt index 22e69b8d600..0999f55d627 100644 --- a/src/storage/CMakeLists.txt +++ b/src/storage/CMakeLists.txt @@ -12,6 +12,7 @@ nebula_add_library( nebula_add_library( storage_http_handler OBJECT + StorageHttpIngestHandler.cpp StorageHttpStatusHandler.cpp StorageHttpDownloadHandler.cpp StorageHttpAdminHandler.cpp diff --git a/src/storage/StorageHttpDownloadHandler.cpp b/src/storage/StorageHttpDownloadHandler.cpp index 32fb3918e01..d0a2a43ec30 100644 --- a/src/storage/StorageHttpDownloadHandler.cpp +++ b/src/storage/StorageHttpDownloadHandler.cpp @@ -7,7 +7,9 @@ #include "storage/StorageHttpDownloadHandler.h" #include "webservice/Common.h" #include "process/ProcessUtils.h" +#include "fs/FileUtils.h" #include "hdfs/HdfsHelper.h" +#include "kvstore/Part.h" #include "thread/GenericThreadPool.h" #include #include @@ -19,18 +21,24 @@ DEFINE_int32(download_thread_num, 3, "download thread number"); namespace nebula { namespace storage { -static std::atomic_flag isRunning = ATOMIC_FLAG_INIT; -std::once_flag poolStartFlag; - using proxygen::HTTPMessage; using proxygen::HTTPMethod; using proxygen::ProxygenError; using proxygen::UpgradeProtocol; using proxygen::ResponseBuilder; -void StorageHttpDownloadHandler::init(nebula::hdfs::HdfsHelper *helper) { +void StorageHttpDownloadHandler::init(nebula::hdfs::HdfsHelper *helper, + nebula::thread::GenericThreadPool *pool, + nebula::kvstore::KVStore *kvstore, + std::vector paths) { helper_ = helper; + pool_ = pool; + kvstore_ = kvstore; + paths_ = paths; CHECK_NOTNULL(helper_); + CHECK_NOTNULL(pool_); + CHECK_NOTNULL(kvstore_); + CHECK(!paths_.empty()); } void StorageHttpDownloadHandler::onRequest(std::unique_ptr headers) noexcept { @@ -44,16 +52,24 @@ void StorageHttpDownloadHandler::onRequest(std::unique_ptr headers) !headers->hasQueryParam("port") || !headers->hasQueryParam("path") || !headers->hasQueryParam("parts") || - !headers->hasQueryParam("local")) { + !headers->hasQueryParam("space")) { LOG(ERROR) << "Illegal Argument"; err_ = HttpCode::E_ILLEGAL_ARGUMENT; return; } + hdfsHost_ = headers->getQueryParam("host"); hdfsPort_ = headers->getIntQueryParam("port"); - partitions_ = headers->getQueryParam("parts"); hdfsPath_ = headers->getQueryParam("path"); - localPath_ = headers->getQueryParam("local"); + partitions_ = headers->getQueryParam("parts"); + spaceID_ = headers->getIntQueryParam("space"); + + for (auto &path : paths_) { + auto downloadPath = folly::stringPrintf("%s/nebula/%d/download", path.c_str(), spaceID_); + if (fs::FileUtils::fileType(downloadPath.c_str()) == fs::FileType::NOTEXIST) { + fs::FileUtils::makeDir(downloadPath); + } + } } @@ -66,12 +82,14 @@ void StorageHttpDownloadHandler::onEOM() noexcept { switch (err_) { case HttpCode::E_UNSUPPORTED_METHOD: ResponseBuilder(downstream_) - .status(405, "Method Not Allowed") + .status(WebServiceUtils::to(HttpStatusCode::METHOD_NOT_ALLOWED), + WebServiceUtils::toString(HttpStatusCode::METHOD_NOT_ALLOWED)) .sendWithEOM(); return; case HttpCode::E_ILLEGAL_ARGUMENT: ResponseBuilder(downstream_) - .status(400, "Bad Request") + .status(WebServiceUtils::to(HttpStatusCode::BAD_REQUEST), + WebServiceUtils::toString(HttpStatusCode::BAD_REQUEST)) .sendWithEOM(); return; default: @@ -88,21 +106,24 @@ void StorageHttpDownloadHandler::onEOM() noexcept { .sendWithEOM(); } - if (downloadSSTFiles(hdfsHost_, hdfsPort_, hdfsPath_, parts, localPath_)) { + if (downloadSSTFiles(hdfsHost_, hdfsPort_, hdfsPath_, parts)) { ResponseBuilder(downstream_) - .status(200, "SSTFile download successfully") + .status(WebServiceUtils::to(HttpStatusCode::OK), + WebServiceUtils::toString(HttpStatusCode::OK)) .body("SSTFile download successfully") .sendWithEOM(); } else { ResponseBuilder(downstream_) - .status(404, "SSTFile download failed") + .status(WebServiceUtils::to(HttpStatusCode::FORBIDDEN), + WebServiceUtils::toString(HttpStatusCode::FORBIDDEN)) .body("SSTFile download failed") .sendWithEOM(); } } else { LOG(ERROR) << "HADOOP_HOME not exist"; ResponseBuilder(downstream_) - .status(404, "HADOOP_HOME not exist") + .status(WebServiceUtils::to(HttpStatusCode::NOT_FOUND), + WebServiceUtils::toString(HttpStatusCode::NOT_FOUND)) .sendWithEOM(); } } @@ -126,45 +147,44 @@ void StorageHttpDownloadHandler::onError(ProxygenError error) noexcept { bool StorageHttpDownloadHandler::downloadSSTFiles(const std::string& hdfsHost, int32_t hdfsPort, const std::string& hdfsPath, - const std::vector& parts, - const std::string& localPath) { + const std::vector& parts) { + static std::atomic_flag isRunning = ATOMIC_FLAG_INIT; if (isRunning.test_and_set()) { LOG(ERROR) << "Download is not completed"; return false; } std::vector> futures; - static nebula::thread::GenericThreadPool pool; - std::call_once(poolStartFlag, []() { - LOG(INFO) << "Download Thread Pool start"; - pool.start(FLAGS_download_thread_num); - }); for (auto& part : parts) { - auto downloader = [hdfsHost, hdfsPort, hdfsPath, localPath, part, this]() { - int32_t partInt; - try { - partInt = folly::to(part); - } catch (const std::exception& ex) { - LOG(ERROR) << "Invalid part: \"" << part << "\""; + PartitionID partId; + try { + partId = folly::to(part); + } catch (const std::exception& ex) { + isRunning.clear(); + LOG(ERROR) << "Invalid part: \"" << part << "\""; + return false; + } + + auto downloader = [hdfsHost, hdfsPort, hdfsPath, partId, this]() { + auto hdfsPartPath = folly::stringPrintf("%s/%d", hdfsPath.c_str(), partId); + auto partResult = kvstore_->part(spaceID_, partId); + if (!ok(partResult)) { + LOG(ERROR) << "Can't found space: " << spaceID_ << ", part: " << partId; return false; } - auto hdfsPartPath = folly::stringPrintf("%s/%d", hdfsPath.c_str(), partInt); + auto localPath = folly::stringPrintf("%s/download/", + value(partResult)->engine()->getDataRoot()); auto result = this->helper_->copyToLocal(hdfsHost, hdfsPort, hdfsPartPath, localPath); - if (!result.ok() || !result.value().empty()) { - LOG(ERROR) << "Download SSTFile Failed"; - return false; - } else { - return true; - } + return result.ok() && result.value().empty(); }; - auto future = pool.addTask(downloader); + auto future = pool_->addTask(downloader); futures.push_back(std::move(future)); } - std::atomic successfully{true}; + bool successfully{true}; folly::collectAll(futures).then([&](const std::vector>& tries) { for (const auto& t : tries) { if (t.hasException()) { diff --git a/src/storage/StorageHttpDownloadHandler.h b/src/storage/StorageHttpDownloadHandler.h index 044b7fcec5d..ac4a36691f7 100644 --- a/src/storage/StorageHttpDownloadHandler.h +++ b/src/storage/StorageHttpDownloadHandler.h @@ -10,7 +10,9 @@ #include "base/Base.h" #include "webservice/Common.h" #include "hdfs/HdfsHelper.h" -#include "proxygen/httpserver/RequestHandler.h" +#include "kvstore/KVStore.h" +#include "thread/GenericThreadPool.h" +#include namespace nebula { namespace storage { @@ -21,11 +23,14 @@ class StorageHttpDownloadHandler : public proxygen::RequestHandler { public: StorageHttpDownloadHandler() = default; - void init(nebula::hdfs::HdfsHelper *helper); + void init(nebula::hdfs::HdfsHelper *helper, + nebula::thread::GenericThreadPool *pool, + nebula::kvstore::KVStore *kvstore, + std::vector paths); void onRequest(std::unique_ptr headers) noexcept override; - void onBody(std::unique_ptr body) noexcept override; + void onBody(std::unique_ptr body) noexcept override; void onEOM() noexcept override; @@ -39,18 +44,20 @@ class StorageHttpDownloadHandler : public proxygen::RequestHandler { bool downloadSSTFiles(const std::string& url, int port, const std::string& path, - const std::vector& parts, - const std::string& local); + const std::vector& parts); private: HttpCode err_{HttpCode::SUCCEEDED}; + GraphSpaceID spaceID_; std::string hdfsHost_; int32_t hdfsPort_; std::string hdfsPath_; std::string partitions_; - std::string localPath_; nebula::hdfs::HdfsHelper *helper_; + nebula::thread::GenericThreadPool *pool_; + nebula::kvstore::KVStore *kvstore_; + std::vector paths_; }; } // namespace storage diff --git a/src/storage/StorageHttpIngestHandler.cpp b/src/storage/StorageHttpIngestHandler.cpp new file mode 100644 index 00000000000..0c8ec51f849 --- /dev/null +++ b/src/storage/StorageHttpIngestHandler.cpp @@ -0,0 +1,105 @@ +/* Copyright (c) 2019 vesoft inc. All rights reserved. + * + * This source code is licensed under Apache 2.0 License, + * attached with Common Clause Condition 1.0, found in the LICENSES directory. + */ + +#include "storage/StorageHttpIngestHandler.h" +#include +#include +#include + +namespace nebula { +namespace storage { + +using proxygen::HTTPMessage; +using proxygen::HTTPMethod; +using proxygen::ProxygenError; +using proxygen::UpgradeProtocol; +using proxygen::ResponseBuilder; + +void StorageHttpIngestHandler::init(nebula::kvstore::KVStore *kvstore) { + kvstore_ = kvstore; + CHECK_NOTNULL(kvstore_); +} + +void StorageHttpIngestHandler::onRequest(std::unique_ptr headers) noexcept { + if (headers->getMethod().value() != HTTPMethod::GET) { + // Unsupported method + err_ = HttpCode::E_UNSUPPORTED_METHOD; + return; + } + + if (!headers->hasQueryParam("space")) { + err_ = HttpCode::E_ILLEGAL_ARGUMENT; + return; + } + + space_ = headers->getIntQueryParam("space"); +} + +void StorageHttpIngestHandler::onBody(std::unique_ptr) noexcept { + // Do nothing, we only support GET +} + +void StorageHttpIngestHandler::onEOM() noexcept { + switch (err_) { + case HttpCode::E_UNSUPPORTED_METHOD: + ResponseBuilder(downstream_) + .status(WebServiceUtils::to(HttpStatusCode::METHOD_NOT_ALLOWED), + WebServiceUtils::toString(HttpStatusCode::METHOD_NOT_ALLOWED)) + .sendWithEOM(); + return; + case HttpCode::E_ILLEGAL_ARGUMENT: + ResponseBuilder(downstream_) + .status(WebServiceUtils::to(HttpStatusCode::BAD_REQUEST), + WebServiceUtils::toString(HttpStatusCode::BAD_REQUEST)) + .sendWithEOM(); + return; + default: + break; + } + + if (ingestSSTFiles(space_)) { + LOG(ERROR) << "SSTFile ingest successfully "; + ResponseBuilder(downstream_) + .status(WebServiceUtils::to(HttpStatusCode::OK), + WebServiceUtils::toString(HttpStatusCode::OK)) + .body("SSTFile ingest successfully") + .sendWithEOM(); + } else { + LOG(ERROR) << "SSTFile ingest failed"; + ResponseBuilder(downstream_) + .status(WebServiceUtils::to(HttpStatusCode::FORBIDDEN), + WebServiceUtils::toString(HttpStatusCode::FORBIDDEN)) + .body("SSTFile ingest failed") + .sendWithEOM(); + } +} + +void StorageHttpIngestHandler::onUpgrade(UpgradeProtocol) noexcept { + // Do nothing +} + + +void StorageHttpIngestHandler::requestComplete() noexcept { + delete this; +} + +void StorageHttpIngestHandler::onError(ProxygenError error) noexcept { + LOG(ERROR) << "Web Service MetaHttpIngestHandler Failed: " + << proxygen::getErrorString(error); +} + +bool StorageHttpIngestHandler::ingestSSTFiles(GraphSpaceID space) { + auto code = kvstore_->ingest(space); + if (code == kvstore::ResultCode::SUCCEEDED) { + return true; + } else { + LOG(ERROR) << "SSTFile Ingest Failed: " << code; + return false; + } +} + +} // namespace storage +} // namespace nebula diff --git a/src/storage/StorageHttpIngestHandler.h b/src/storage/StorageHttpIngestHandler.h new file mode 100644 index 00000000000..a09b52e966d --- /dev/null +++ b/src/storage/StorageHttpIngestHandler.h @@ -0,0 +1,50 @@ +/* Copyright (c) 2019 vesoft inc. All rights reserved. + * + * This source code is licensed under Apache 2.0 License, + * attached with Common Clause Condition 1.0, found in the LICENSES directory. + */ + +#ifndef STORAGE_STORAGEHTTPINGESTHANDLER_H +#define STORAGE_STORAGEHTTPINGESTHANDLER_H + +#include "base/Base.h" +#include "webservice/Common.h" +#include "kvstore/KVStore.h" +#include + +namespace nebula { +namespace storage { + +using nebula::HttpCode; + +class StorageHttpIngestHandler : public proxygen::RequestHandler { +public: + StorageHttpIngestHandler() = default; + + void init(nebula::kvstore::KVStore *kvstore); + + void onRequest(std::unique_ptr headers) noexcept override; + + void onBody(std::unique_ptr body) noexcept override; + + void onEOM() noexcept override; + + void onUpgrade(proxygen::UpgradeProtocol protocol) noexcept override; + + void requestComplete() noexcept override; + + void onError(proxygen::ProxygenError error) noexcept override; + + bool ingestSSTFiles(GraphSpaceID space); + +private: + HttpCode err_{HttpCode::SUCCEEDED}; + nebula::kvstore::KVStore *kvstore_; + GraphSpaceID space_; +}; + +} // namespace storage +} // namespace nebula + +#endif // STORAGE_STORAGEHTTPINGESTHANDLER_H + diff --git a/src/storage/StorageHttpStatusHandler.cpp b/src/storage/StorageHttpStatusHandler.cpp index a0b33ac5f84..9cc941f5c10 100644 --- a/src/storage/StorageHttpStatusHandler.cpp +++ b/src/storage/StorageHttpStatusHandler.cpp @@ -47,7 +47,8 @@ void StorageHttpStatusHandler::onEOM() noexcept { switch (err_) { case HttpCode::E_UNSUPPORTED_METHOD: ResponseBuilder(downstream_) - .status(405, "Method Not Allowed") + .status(WebServiceUtils::to(HttpStatusCode::METHOD_NOT_ALLOWED), + WebServiceUtils::toString(HttpStatusCode::METHOD_NOT_ALLOWED)) .sendWithEOM(); return; default: @@ -57,12 +58,14 @@ void StorageHttpStatusHandler::onEOM() noexcept { folly::dynamic vals = getStatus(); if (returnJson_) { ResponseBuilder(downstream_) - .status(200, "OK") + .status(WebServiceUtils::to(HttpStatusCode::OK), + WebServiceUtils::toString(HttpStatusCode::OK)) .body(folly::toJson(vals)) .sendWithEOM(); } else { ResponseBuilder(downstream_) - .status(200, "OK") + .status(WebServiceUtils::to(HttpStatusCode::OK), + WebServiceUtils::toString(HttpStatusCode::OK)) .body(toStr(vals)) .sendWithEOM(); } diff --git a/src/storage/StorageHttpStatusHandler.h b/src/storage/StorageHttpStatusHandler.h index 279a2a7f053..5de7df9f39f 100644 --- a/src/storage/StorageHttpStatusHandler.h +++ b/src/storage/StorageHttpStatusHandler.h @@ -10,7 +10,7 @@ #include "base/Base.h" #include "webservice/Common.h" #include "kvstore/KVStore.h" -#include "proxygen/httpserver/RequestHandler.h" +#include namespace nebula { namespace storage { diff --git a/src/storage/test/CMakeLists.txt b/src/storage/test/CMakeLists.txt index 082b784a745..287cc7763b0 100644 --- a/src/storage/test/CMakeLists.txt +++ b/src/storage/test/CMakeLists.txt @@ -76,7 +76,11 @@ nebula_add_test( nebula_add_test( NAME query_stats_test SOURCES QueryStatsTest.cpp - OBJECTS $ ${storage_test_deps} + OBJECTS + $ + $ + $ + ${storage_test_deps} LIBRARIES ${ROCKSDB_LIBRARIES} ${THRIFT_LIBRARIES} wangle gtest ) @@ -106,6 +110,7 @@ nebula_add_test( StorageHttpStatusHandlerTest.cpp OBJECTS $ + $ $ $ $ @@ -128,6 +133,7 @@ nebula_add_test( StorageHttpAdminHandlerTest.cpp OBJECTS $ + $ $ $ $ @@ -150,6 +156,30 @@ nebula_add_test( StorageHttpDownloadHandlerTest.cpp OBJECTS $ + $ + $ + $ + $ + $ + $ + ${storage_test_deps} + LIBRARIES + proxygenhttpserver + proxygenlib + ${ROCKSDB_LIBRARIES} + ${THRIFT_LIBRARIES} + wangle + gtest +) + +nebula_add_test( + NAME + storage_http_ingest_test + SOURCES + StorageHttpIngestHandlerTest.cpp + OBJECTS + $ + $ $ $ $ diff --git a/src/storage/test/CompactionTest.cpp b/src/storage/test/CompactionTest.cpp index e3e68c37644..3670dde21d2 100644 --- a/src/storage/test/CompactionTest.cpp +++ b/src/storage/test/CompactionTest.cpp @@ -79,6 +79,7 @@ TEST(NebulaCompactionFilterTest, InvalidSchemaAndMutliVersionsFilterTest) { std::shared_ptr cfFactory( new NebulaCompactionFilterFactory(schemaMan.get())); std::unique_ptr kv(TestUtils::initKV(rootPath.path(), + 6, {0, 0}, nullptr, false, diff --git a/src/storage/test/StorageHttpAdminHandlerTest.cpp b/src/storage/test/StorageHttpAdminHandlerTest.cpp index d2da1ac3a85..5f58a2d163f 100644 --- a/src/storage/test/StorageHttpAdminHandlerTest.cpp +++ b/src/storage/test/StorageHttpAdminHandlerTest.cpp @@ -51,34 +51,52 @@ class StorageHttpAdminHandlerTestEnv : public ::testing::Environment { TEST(StoragehHttpAdminHandlerTest, AdminTest) { { - std::string resp; - ASSERT_TRUE(getUrl("/admin", resp)); - ASSERT_EQ(0, resp.find("Space should not be empty")); + auto url = "/admin"; + auto request = folly::stringPrintf("http://%s:%d%s", FLAGS_ws_ip.c_str(), + FLAGS_ws_http_port, url); + auto resp = http::HttpClient::get(request); + ASSERT_TRUE(resp.ok()); + ASSERT_EQ(0, resp.value().find("Space should not be empty")); } { - std::string resp; - ASSERT_TRUE(getUrl("/admin?space=xx", resp)); - ASSERT_EQ(0, resp.find("Op should not be empty")); + auto url = "/admin?space=xx"; + auto request = folly::stringPrintf("http://%s:%d%s", FLAGS_ws_ip.c_str(), + FLAGS_ws_http_port, url); + auto resp = http::HttpClient::get(request); + ASSERT_TRUE(resp.ok()); + ASSERT_EQ(0, resp.value().find("Op should not be empty")); } { - std::string resp; - ASSERT_TRUE(getUrl("/admin?space=xx&op=yy", resp)); - ASSERT_EQ(0, resp.find("Can't find space xx")); + auto url = "/admin?space=xx&op=yy"; + auto request = folly::stringPrintf("http://%s:%d%s", FLAGS_ws_ip.c_str(), + FLAGS_ws_http_port, url); + auto resp = http::HttpClient::get(request); + ASSERT_TRUE(resp.ok()); + ASSERT_EQ(0, resp.value().find("Can't find space xx")); } { - std::string resp; - ASSERT_TRUE(getUrl("/admin?space=0&op=yy", resp)); - ASSERT_EQ(0, resp.find("Unknown operation yy")); + auto url = "/admin?space=0&op=yy"; + auto request = folly::stringPrintf("http://%s:%d%s", FLAGS_ws_ip.c_str(), + FLAGS_ws_http_port, url); + auto resp = http::HttpClient::get(request); + ASSERT_TRUE(resp.ok()); + ASSERT_EQ(0, resp.value().find("Unknown operation yy")); } { - std::string resp; - ASSERT_TRUE(getUrl("/admin?space=0&op=flush", resp)); - ASSERT_EQ("ok", resp); + auto url = "/admin?space=0&op=flush"; + auto request = folly::stringPrintf("http://%s:%d%s", FLAGS_ws_ip.c_str(), + FLAGS_ws_http_port, url); + auto resp = http::HttpClient::get(request); + ASSERT_TRUE(resp.ok()); + ASSERT_EQ("ok", resp.value()); } { - std::string resp; - ASSERT_TRUE(getUrl("/admin?space=0&op=compact", resp)); - ASSERT_EQ("ok", resp); + auto url = "/admin?space=0&op=compact"; + auto request = folly::stringPrintf("http://%s:%d%s", FLAGS_ws_ip.c_str(), + FLAGS_ws_http_port, url); + auto resp = http::HttpClient::get(request); + ASSERT_TRUE(resp.ok()); + ASSERT_EQ("ok", resp.value()); } } diff --git a/src/storage/test/StorageHttpDownloadHandlerTest.cpp b/src/storage/test/StorageHttpDownloadHandlerTest.cpp index 6f079141943..7cd254321e5 100644 --- a/src/storage/test/StorageHttpDownloadHandlerTest.cpp +++ b/src/storage/test/StorageHttpDownloadHandlerTest.cpp @@ -6,8 +6,8 @@ #include "base/Base.h" #include +#include "http/HttpClient.h" #include "webservice/WebService.h" -#include "webservice/test/TestUtils.h" #include "storage/StorageHttpDownloadHandler.h" #include "storage/test/MockHdfsHelper.h" #include "storage/test/TestUtils.h" @@ -26,10 +26,18 @@ class StorageHttpDownloadHandlerTestEnv : public ::testing::Environment { void SetUp() override { FLAGS_ws_http_port = 0; FLAGS_ws_h2_port = 0; + + rootPath_ = std::make_unique("/tmp/StorageHttpDownloadHandler.XXXXXX"); + kv_ = TestUtils::initKV(rootPath_->path()); + + pool_ = std::make_unique(); + pool_->start(1); + VLOG(1) << "Starting web service..."; - WebService::registerHandler("/download", [] { + WebService::registerHandler("/download", [this] { auto handler = new storage::StorageHttpDownloadHandler(); - handler->init(helper.get()); + std::vector paths{rootPath_->path()}; + handler->init(helper.get(), pool_.get(), kv_.get(), paths); return handler; }); auto status = WebService::start(); @@ -37,35 +45,52 @@ class StorageHttpDownloadHandlerTestEnv : public ::testing::Environment { } void TearDown() override { + kv_.reset(); + rootPath_.reset(); WebService::stop(); + pool_->stop(); VLOG(1) << "Web service stopped"; } + +private: + std::unique_ptr rootPath_; + std::unique_ptr kv_; + std::unique_ptr pool_; }; TEST(StorageHttpDownloadHandlerTest, StorageDownloadTest) { { - std::string resp; - ASSERT_TRUE(getUrl("/download", resp)); - ASSERT_TRUE(resp.empty()); + auto url = "/download"; + auto request = folly::stringPrintf("http://%s:%d%s", FLAGS_ws_ip.c_str(), + FLAGS_ws_http_port, url); + auto resp = http::HttpClient::get(request); + ASSERT_TRUE(resp.ok()); + ASSERT_TRUE(resp.value().empty()); } { - auto url = "/download?host=127.0.0.1&port=9000&path=/data&parts=1&local=/tmp"; - std::string resp; - ASSERT_TRUE(getUrl(url, resp)); - ASSERT_EQ("SSTFile download successfully", resp); + auto url = "/download?host=127.0.0.1&port=9000&path=/data&parts=1&space=0"; + auto request = folly::stringPrintf("http://%s:%d%s", FLAGS_ws_ip.c_str(), + FLAGS_ws_http_port, url); + auto resp = http::HttpClient::get(request); + ASSERT_TRUE(resp.ok()); + ASSERT_EQ("SSTFile download successfully", resp.value()); } { - auto url = "/download?host=127.0.0.1&port=9000&path=/data&parts=illegal-part&local=/tmp"; - std::string resp; - ASSERT_TRUE(getUrl(url, resp)); - ASSERT_EQ("SSTFile download failed", resp); + auto url = "/download?host=127.0.0.1&port=9000&path=/data&parts=illegal-part&space=0"; + auto request = folly::stringPrintf("http://%s:%d%s", FLAGS_ws_ip.c_str(), + FLAGS_ws_http_port, url); + auto resp = http::HttpClient::get(request); + ASSERT_TRUE(resp.ok()); + ASSERT_EQ("SSTFile download failed", resp.value()); } { helper = std::make_unique(); - auto url = "/download?host=127.0.0.1&port=9000&path=/data&parts=1&local=/tmp"; - std::string resp; - ASSERT_TRUE(getUrl(url, resp)); - ASSERT_EQ("SSTFile download failed", resp); + auto url = "/download?host=127.0.0.1&port=9000&path=/data&parts=1&space=0"; + auto request = folly::stringPrintf("http://%s:%d%s", FLAGS_ws_ip.c_str(), + FLAGS_ws_http_port, url); + auto resp = http::HttpClient::get(request); + ASSERT_TRUE(resp.ok()); + ASSERT_EQ("SSTFile download failed", resp.value()); } } diff --git a/src/storage/test/StorageHttpIngestHandlerTest.cpp b/src/storage/test/StorageHttpIngestHandlerTest.cpp new file mode 100644 index 00000000000..b40478f650e --- /dev/null +++ b/src/storage/test/StorageHttpIngestHandlerTest.cpp @@ -0,0 +1,100 @@ +/* Copyright (c) 2019 vesoft inc. All rights reserved. + * + * This source code is licensed under Apache 2.0 License, + * attached with Common Clause Condition 1.0, found in the LICENSES directory. + */ + +#include "base/Base.h" +#include "fs/TempDir.h" +#include "http/HttpClient.h" +#include "webservice/WebService.h" +#include "storage/test/TestUtils.h" +#include "storage/StorageHttpIngestHandler.h" +#include +#include + +namespace nebula { +namespace storage { + +class StorageHttpIngestHandlerTestEnv : public ::testing::Environment { +public: + void SetUp() override { + FLAGS_ws_http_port = 0; + FLAGS_ws_h2_port = 0; + VLOG(1) << "Starting web service..."; + + rootPath_ = std::make_unique("/tmp/StorageHttpIngestHandler.XXXXXX"); + kv_ = TestUtils::initKV(rootPath_->path(), 1); + + WebService::registerHandler("/ingest", [this] { + auto handler = new storage::StorageHttpIngestHandler(); + handler->init(kv_.get()); + return handler; + }); + auto status = WebService::start(); + ASSERT_TRUE(status.ok()) << status; + } + + void TearDown() override { + kv_.reset(); + rootPath_.reset(); + WebService::stop(); + VLOG(1) << "Web service stopped"; + } + +private: + std::unique_ptr rootPath_; + std::unique_ptr kv_; +}; + +TEST(StorageHttpIngestHandlerTest, StorageIngestTest) { + auto path = "/tmp/StorageHttpIngestData.XXXXXX"; + std::unique_ptr externalPath_ = std::make_unique(path); + auto partPath = folly::stringPrintf("%s/1", externalPath_->path()); + ASSERT_TRUE(nebula::fs::FileUtils::makeDir(partPath)); + + auto options = rocksdb::Options(); + auto env = rocksdb::EnvOptions(); + rocksdb::SstFileWriter writer{env, options}; + auto sstPath = folly::stringPrintf("%s/data.sst", partPath.c_str()); + auto status = writer.Open(sstPath); + ASSERT_EQ(rocksdb::Status::OK(), status); + + for (auto i = 0; i < 10; i++) { + status = writer.Put(folly::stringPrintf("key_%d", i), + folly::stringPrintf("val_%d", i)); + ASSERT_EQ(rocksdb::Status::OK(), status); + } + status = writer.Finish(); + ASSERT_EQ(rocksdb::Status::OK(), status); + + { + auto url = "/ingest?space=0"; + auto request = folly::stringPrintf("http://%s:%d%s", FLAGS_ws_ip.c_str(), + FLAGS_ws_http_port, url); + auto resp = http::HttpClient::get(request); + ASSERT_TRUE(resp.ok()); + ASSERT_EQ("SSTFile ingest successfully", resp.value()); + } + { + auto url = "/ingest?space=1"; + auto request = folly::stringPrintf("http://%s:%d%s", FLAGS_ws_ip.c_str(), + FLAGS_ws_http_port, url); + auto resp = http::HttpClient::get(request); + ASSERT_TRUE(resp.ok()); + ASSERT_EQ("SSTFile ingest failed", resp.value()); + } +} + +} // namespace storage +} // namespace nebula + +int main(int argc, char** argv) { + ::testing::InitGoogleTest(&argc, argv); + folly::init(&argc, &argv, true); + google::SetStderrLogging(google::INFO); + + ::testing::AddGlobalTestEnvironment(new nebula::storage::StorageHttpIngestHandlerTestEnv()); + + return RUN_ALL_TESTS(); +} diff --git a/src/storage/test/StorageHttpStatusHandlerTest.cpp b/src/storage/test/StorageHttpStatusHandlerTest.cpp index 432fd47ed4d..dfabf6061db 100644 --- a/src/storage/test/StorageHttpStatusHandlerTest.cpp +++ b/src/storage/test/StorageHttpStatusHandlerTest.cpp @@ -8,11 +8,11 @@ #include #include #include "webservice/WebService.h" -#include "webservice/test/TestUtils.h" #include "storage/StorageHttpStatusHandler.h" #include "meta/test/TestUtils.h" #include "storage/test/TestUtils.h" #include "fs/TempDir.h" +#include "http/HttpClient.h" DECLARE_string(meta_server_addrs); DECLARE_int32(load_data_interval_secs); @@ -45,24 +45,37 @@ class StorageHttpStatusHandlerTestEnv : public ::testing::Environment { TEST(StoragehHttpStatusHandlerTest, StorageStatusTest) { { - std::string resp; - ASSERT_TRUE(getUrl("/status", resp)); - ASSERT_EQ(std::string("status=running\n"), resp); + auto url = "/status"; + auto request = folly::stringPrintf("http://%s:%d%s", FLAGS_ws_ip.c_str(), + FLAGS_ws_http_port, url); + auto resp = http::HttpClient::get(request); + ASSERT_TRUE(resp.ok()); + ASSERT_EQ("status=running\n", resp.value()); } { - std::string resp; - ASSERT_TRUE(getUrl("", resp)); - ASSERT_EQ(std::string("status=running\n"), resp); + auto url = ""; + auto request = folly::stringPrintf("http://%s:%d%s", FLAGS_ws_ip.c_str(), + FLAGS_ws_http_port, url); + auto resp = http::HttpClient::get(request); + ASSERT_TRUE(resp.ok()); + ASSERT_EQ("status=running\n", resp.value()); } { - std::string resp; - ASSERT_TRUE(getUrl("/status?daemon=status", resp)); - ASSERT_EQ(std::string("status=running\n"), resp); + auto url = "/status?daemon=status"; + auto request = folly::stringPrintf("http://%s:%d%s", FLAGS_ws_ip.c_str(), + FLAGS_ws_http_port, url); + auto resp = http::HttpClient::get(request); + ASSERT_TRUE(resp.ok()); + ASSERT_EQ("status=running\n", resp.value()); } { - std::string resp; - ASSERT_TRUE(getUrl("/status?daemon=status&returnjson", resp)); - auto json = folly::parseJson(resp); + auto url = "/status?daemon=status&returnjson"; + auto request = folly::stringPrintf("http://%s:%d%s", FLAGS_ws_ip.c_str(), + FLAGS_ws_http_port, url); + auto resp = http::HttpClient::get(request); + ASSERT_TRUE(resp.ok()); + + auto json = folly::parseJson(resp.value()); ASSERT_TRUE(json.isArray()); ASSERT_EQ(1UL, json.size()); ASSERT_TRUE(json[0].isObject()); @@ -79,9 +92,12 @@ TEST(StoragehHttpStatusHandlerTest, StorageStatusTest) { ASSERT_EQ("running", it->second.getString()); } { - std::string resp; - ASSERT_TRUE(getUrl("/status123?deamon=status", resp)); - ASSERT_TRUE(resp.empty()); + auto url = "/status123?deamon=status"; + auto request = folly::stringPrintf("http://%s:%d%s", FLAGS_ws_ip.c_str(), + FLAGS_ws_http_port, url); + auto resp = http::HttpClient::get(request); + ASSERT_TRUE(resp.ok()); + ASSERT_TRUE(resp.value().empty()); } } diff --git a/src/storage/test/TestUtils.h b/src/storage/test/TestUtils.h index 6c0ef6554d8..7ec38517dd9 100644 --- a/src/storage/test/TestUtils.h +++ b/src/storage/test/TestUtils.h @@ -31,6 +31,7 @@ class TestUtils { public: static std::unique_ptr initKV( const char* rootPath, + int32_t partitionNumber = 6, HostAddr localhost = {0, 0}, meta::MetaClient* mClient = nullptr, bool useMetaServer = false, @@ -52,7 +53,7 @@ class TestUtils { // GraphSpaceID => {PartitionIDs} // 0 => {0, 1, 2, 3, 4, 5} auto& partsMap = memPartMan->partsMap(); - for (auto partId = 0; partId < 6; partId++) { + for (auto partId = 0; partId < partitionNumber; partId++) { partsMap[0][partId] = PartMeta(); } @@ -192,7 +193,7 @@ class TestUtils { bool useMetaServer = false) { auto sc = std::make_unique(); // Always use the Meta Service in this case - sc->kvStore_ = TestUtils::initKV(dataPath, {ip, port}, mClient, true); + sc->kvStore_ = TestUtils::initKV(dataPath, 6, {ip, port}, mClient, true); if (!useMetaServer) { sc->schemaMan_ = TestUtils::mockSchemaMan(1); diff --git a/src/webservice/CMakeLists.txt b/src/webservice/CMakeLists.txt index 59a1305bdba..b896352725b 100644 --- a/src/webservice/CMakeLists.txt +++ b/src/webservice/CMakeLists.txt @@ -7,4 +7,9 @@ nebula_add_library( GetStatsHandler.cpp ) +nebula_add_library( + ws_common_obj OBJECT + Common.cpp +) + add_subdirectory(test) diff --git a/src/webservice/Common.cpp b/src/webservice/Common.cpp new file mode 100644 index 00000000000..b89955a9e73 --- /dev/null +++ b/src/webservice/Common.cpp @@ -0,0 +1,12 @@ +/* Copyright (c) 2019 vesoft inc. All rights reserved. + * + * This source code is licensed under Apache 2.0 License, + * attached with Common Clause Condition 1.0, found in the LICENSES directory. + */ + +#include "webservice/Common.h" + +DEFINE_int32(ws_meta_http_port, 11000, "Port to listen on Meta with HTTP protocol"); +DEFINE_int32(ws_meta_h2_port, 11002, "Port to listen on Meta with HTTP/2 protocol"); +DEFINE_int32(ws_storage_http_port, 12000, "Port to listen on Storage with HTTP protocol"); +DEFINE_int32(ws_storage_h2_port, 12002, "Port to listen on Storage with HTTP/2 protocol"); diff --git a/src/webservice/Common.h b/src/webservice/Common.h index a4564ebc369..676bed6a25b 100644 --- a/src/webservice/Common.h +++ b/src/webservice/Common.h @@ -7,6 +7,13 @@ #ifndef WEBSERVICE_COMMON_H_ #define WEBSERVICE_COMMON_H_ +#include "base/Base.h" + +DECLARE_int32(ws_meta_http_port); +DECLARE_int32(ws_meta_h2_port); +DECLARE_int32(ws_storage_http_port); +DECLARE_int32(ws_storage_h2_port); + namespace nebula { enum class HttpCode { @@ -16,5 +23,32 @@ enum class HttpCode { E_ILLEGAL_ARGUMENT = -3, }; +enum class HttpStatusCode { + OK = 200, + BAD_REQUEST = 400, + FORBIDDEN = 403, + NOT_FOUND = 404, + METHOD_NOT_ALLOWED = 405, +}; + +static std::map statusStringMap { + {HttpStatusCode::OK, "OK"}, + {HttpStatusCode::BAD_REQUEST, "Bad Request"}, + {HttpStatusCode::FORBIDDEN, "Forbidden"}, + {HttpStatusCode::NOT_FOUND, "Not Found"}, + {HttpStatusCode::METHOD_NOT_ALLOWED, "Method Not Allowed"} +}; + +class WebServiceUtils final { +public: + static int32_t to(HttpStatusCode code) { + return static_cast(code); + } + + static std::string toString(HttpStatusCode code) { + return statusStringMap[code]; + } +}; + } // namespace nebula #endif // WEBSERVICE_COMMON_H_ diff --git a/src/webservice/GetFlagsHandler.cpp b/src/webservice/GetFlagsHandler.cpp index fc210f15ba5..92a03310b22 100644 --- a/src/webservice/GetFlagsHandler.cpp +++ b/src/webservice/GetFlagsHandler.cpp @@ -51,7 +51,8 @@ void GetFlagsHandler::onEOM() noexcept { switch (err_) { case HttpCode::E_UNSUPPORTED_METHOD: ResponseBuilder(downstream_) - .status(405, "Method Not Allowed") + .status(WebServiceUtils::to(HttpStatusCode::METHOD_NOT_ALLOWED), + WebServiceUtils::toString(HttpStatusCode::METHOD_NOT_ALLOWED)) .sendWithEOM(); return; default: @@ -61,12 +62,14 @@ void GetFlagsHandler::onEOM() noexcept { folly::dynamic vals = getFlags(); if (returnJson_) { ResponseBuilder(downstream_) - .status(200, "OK") + .status(WebServiceUtils::to(HttpStatusCode::OK), + WebServiceUtils::toString(HttpStatusCode::OK)) .body(folly::toJson(vals)) .sendWithEOM(); } else { ResponseBuilder(downstream_) - .status(200, "OK") + .status(WebServiceUtils::to(HttpStatusCode::OK), + WebServiceUtils::toString(HttpStatusCode::OK)) .body(toStr(vals)) .sendWithEOM(); } diff --git a/src/webservice/GetStatsHandler.cpp b/src/webservice/GetStatsHandler.cpp index 5cc6a0df4cc..b8a6c7a9f72 100644 --- a/src/webservice/GetStatsHandler.cpp +++ b/src/webservice/GetStatsHandler.cpp @@ -49,7 +49,8 @@ void GetStatsHandler::onEOM() noexcept { switch (err_) { case HttpCode::E_UNSUPPORTED_METHOD: ResponseBuilder(downstream_) - .status(405, "Method Not Allowed") + .status(WebServiceUtils::to(HttpStatusCode::METHOD_NOT_ALLOWED), + WebServiceUtils::toString(HttpStatusCode::METHOD_NOT_ALLOWED)) .sendWithEOM(); return; default: @@ -60,12 +61,14 @@ void GetStatsHandler::onEOM() noexcept { folly::dynamic vals = getStats(); if (returnJson_) { ResponseBuilder(downstream_) - .status(200, "OK") + .status(WebServiceUtils::to(HttpStatusCode::OK), + WebServiceUtils::toString(HttpStatusCode::OK)) .body(folly::toJson(vals)) .sendWithEOM(); } else { ResponseBuilder(downstream_) - .status(200, "OK") + .status(WebServiceUtils::to(HttpStatusCode::OK), + WebServiceUtils::toString(HttpStatusCode::OK)) .body(toStr(vals)) .sendWithEOM(); } diff --git a/src/webservice/NotFoundHandler.cpp b/src/webservice/NotFoundHandler.cpp index 7890e829d8f..eb95e8d6cc7 100644 --- a/src/webservice/NotFoundHandler.cpp +++ b/src/webservice/NotFoundHandler.cpp @@ -5,6 +5,7 @@ */ #include "base/Base.h" +#include "webservice/Common.h" #include "webservice/NotFoundHandler.h" #include @@ -27,7 +28,8 @@ void NotFoundHandler::onBody(std::unique_ptr) noexcept { void NotFoundHandler::onEOM() noexcept { ResponseBuilder(downstream_) - .status(404, "Not Found") + .status(WebServiceUtils::to(HttpStatusCode::NOT_FOUND), + WebServiceUtils::toString(HttpStatusCode::NOT_FOUND)) .sendWithEOM(); return; } diff --git a/src/webservice/SetFlagsHandler.cpp b/src/webservice/SetFlagsHandler.cpp index 24902efce9a..31d0da05203 100644 --- a/src/webservice/SetFlagsHandler.cpp +++ b/src/webservice/SetFlagsHandler.cpp @@ -57,12 +57,14 @@ void SetFlagsHandler::onEOM() noexcept { switch (err_) { case HttpCode::E_UNSUPPORTED_METHOD: ResponseBuilder(downstream_) - .status(405, "Method Not Allowed") + .status(WebServiceUtils::to(HttpStatusCode::METHOD_NOT_ALLOWED), + WebServiceUtils::toString(HttpStatusCode::METHOD_NOT_ALLOWED)) .sendWithEOM(); return; case HttpCode::E_UNPROCESSABLE: ResponseBuilder(downstream_) - .status(400, "Bad Request") + .status(WebServiceUtils::to(HttpStatusCode::BAD_REQUEST), + WebServiceUtils::toString(HttpStatusCode::BAD_REQUEST)) .sendWithEOM(); return; default: @@ -72,12 +74,14 @@ void SetFlagsHandler::onEOM() noexcept { if (gflags::SetCommandLineOption(name_.c_str(), value_.c_str()).empty()) { // Failed ResponseBuilder(downstream_) - .status(200, "OK") + .status(WebServiceUtils::to(HttpStatusCode::OK), + WebServiceUtils::toString(HttpStatusCode::OK)) .body("false") .sendWithEOM(); } else { ResponseBuilder(downstream_) - .status(200, "OK") + .status(WebServiceUtils::to(HttpStatusCode::OK), + WebServiceUtils::toString(HttpStatusCode::OK)) .body("true") .sendWithEOM(); } diff --git a/src/webservice/test/CMakeLists.txt b/src/webservice/test/CMakeLists.txt index b00f65142c6..95b661b7d4f 100644 --- a/src/webservice/test/CMakeLists.txt +++ b/src/webservice/test/CMakeLists.txt @@ -4,6 +4,7 @@ nebula_add_test( SOURCES FlagsAccessTest.cpp OBJECTS + $ $ $ $ @@ -23,6 +24,7 @@ nebula_add_test( SOURCES StatsReaderTest.cpp OBJECTS + $ $ $ $ diff --git a/src/webservice/test/TestUtils.h b/src/webservice/test/TestUtils.h index d92e1c11b8c..96b0eed50bb 100644 --- a/src/webservice/test/TestUtils.h +++ b/src/webservice/test/TestUtils.h @@ -6,12 +6,10 @@ #include "base/Base.h" #include "webservice/WebService.h" -#include "process/ProcessUtils.h" +#include "http/HttpClient.h" namespace nebula { -using nebula::ProcessUtils; - bool getUrl(const std::string& urlPath, std::string& respBody) { auto url = folly::stringPrintf("http://%s:%d%s", FLAGS_ws_ip.c_str(), @@ -19,9 +17,7 @@ bool getUrl(const std::string& urlPath, std::string& respBody) { urlPath.c_str()); VLOG(1) << "Retrieving url: " << url; - auto command = folly::stringPrintf("/usr/bin/curl -G \"%s\" 2> /dev/null", - url.c_str()); - auto result = ProcessUtils::runCommand(command.c_str()); + auto result = http::HttpClient::get(url.c_str()); if (!result.ok()) { LOG(ERROR) << "Failed to run curl: " << result.status(); return false;