Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support SSL authentication with Kafka in routine load job #1235

Merged
merged 36 commits into from
Jun 7, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
36 commits
Select commit Hold shift + click to select a range
117e405
add small file manager
morningman May 29, 2019
7098512
fix bug1
morningman May 29, 2019
cc989fd
fix bug2
morningman May 29, 2019
3b2ed78
fix bug3
morningman May 29, 2019
5315f3a
fix get log action
morningman May 29, 2019
e98aa5d
add small file write
morningman May 30, 2019
66e6e3f
fix download bug
morningman May 30, 2019
8e58645
fix too many abort tasks
morningman May 30, 2019
5bb8dec
add FILE:
morningman May 30, 2019
96e82f5
add GetSmallFileAction
morningman May 30, 2019
efb992a
register getSmallFileAction
morningman May 30, 2019
870db7c
no password
morningman May 30, 2019
dc6271a
add heartbeat master http port
morningman May 31, 2019
33a00f9
replay create file
morningman May 31, 2019
822d6e9
modify BE
morningman May 31, 2019
d5d6f0f
run ok
morningman Jun 1, 2019
816acb6
FE get partitions from BE
morningman Jun 1, 2019
e034cac
first ok
morningman Jun 1, 2019
961c4c6
FE works
morningman Jun 2, 2019
20f7e28
modify cancel load msg
morningman Jun 2, 2019
63f3f23
Add BE ut
morningman Jun 3, 2019
4aafe7b
add get_log_file doc
morningman Jun 3, 2019
5d72da0
Update docs/documentation/cn/administrator-guide/load-data/routine-lo…
morningman Jun 3, 2019
83261ed
Update docs/documentation/cn/administrator-guide/http-actions/fe_get_…
morningman Jun 3, 2019
5eeef55
Update fe/src/main/java/org/apache/doris/analysis/CreateFileStmt.java
morningman Jun 4, 2019
d1caa69
Update docs/help/Contents/Data Definition/ddl_stmt.md
morningman Jun 4, 2019
903d0e1
add url support for create file stmt
morningman Jun 4, 2019
d187499
fix bugs
morningman Jun 4, 2019
9fea318
fix bugs2
morningman Jun 4, 2019
d423a05
Update be/src/runtime/routine_load/data_consumer.cpp
morningman Jun 5, 2019
be2ce1d
Update be/src/runtime/routine_load/data_consumer.cpp
morningman Jun 5, 2019
2997f5f
modify by review 1
morningman Jun 5, 2019
ab70d3f
change the way to get small file
morningman Jun 5, 2019
96681e5
modify to way of using proto
morningman Jun 6, 2019
d3911b6
change routine load thread pool limit to 80
morningman Jun 6, 2019
ec563c9
remove saveTofile() in smallFileMgr
morningman Jun 7, 2019
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 0 additions & 7 deletions be/src/agent/agent_server.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -70,13 +70,6 @@ AgentServer::AgentServer(ExecEnv* exec_env,
}
}

// create tmp dir
// boost::filesystem::path tmp_path(config::agent_tmp_dir);
// if (boost::filesystem::exists(tmp_path)) {
// boost::filesystem::remove_all(tmp_path);
// }
// boost::filesystem::create_directories(config::agent_tmp_dir);

// init task worker pool
_create_table_workers = new TaskWorkerPool(
TaskWorkerPool::TaskWorkerType::CREATE_TABLE,
Expand Down
4 changes: 4 additions & 0 deletions be/src/agent/heartbeat_server.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,10 @@ Status HeartbeatServer::_heartbeat(
}
}

if (master_info.__isset.http_port) {
_master_info->__set_http_port(master_info.http_port);
}

if (need_report) {
LOG(INFO) << "Master FE is changed or restarted. report tablet and disk info immediately";
_olap_engine->report_notify(true);
Expand Down
17 changes: 6 additions & 11 deletions be/src/common/config.h
Original file line number Diff line number Diff line change
Expand Up @@ -114,14 +114,10 @@ namespace config {
CONF_Int32(sleep_one_second, "1");
// sleep time for five seconds
CONF_Int32(sleep_five_seconds, "5");
// trans file tools dir
CONF_String(trans_file_tool_path, "${DORIS_HOME}/tools/trans_file_tool/trans_files.sh");
// agent tmp dir
CONF_String(agent_tmp_dir, "${DORIS_HOME}/tmp");

// log dir
CONF_String(sys_log_dir, "${DORIS_HOME}/log");
CONF_String(user_function_dir, "${DORIS_HOME}/lib/usr");
CONF_String(user_function_dir, "${DORIS_HOME}/lib/udf");
// INFO, WARNING, ERROR, FATAL
CONF_String(sys_log_level, "INFO");
// TIME-DAY, TIME-HOUR, SIZE-MB-nnn
Expand Down Expand Up @@ -208,7 +204,7 @@ namespace config {
CONF_Int32(file_descriptor_cache_clean_interval, "3600");
CONF_Int32(disk_stat_monitor_interval, "5");
CONF_Int32(unused_index_monitor_interval, "30");
CONF_String(storage_root_path, "${DORIS_HOME}/storage");
CONF_String(storage_root_path, "${DORIS_HOME}/data");
CONF_Int32(min_percentage_of_error_disk, "50");
CONF_Int32(default_num_rows_per_data_block, "1024");
CONF_Int32(default_num_rows_per_column_file_block, "1024");
Expand Down Expand Up @@ -256,12 +252,8 @@ namespace config {

// Port to start debug webserver on
CONF_Int32(webserver_port, "8040");
// Interface to start debug webserver on. If blank, webserver binds to 0.0.0.0
CONF_String(webserver_interface, "");
CONF_String(webserver_doc_root, "${DORIS_HOME}");
// Number of webserver workers
CONF_Int32(webserver_num_workers, "5");
// If true, webserver may serve static files from the webserver_doc_root
CONF_Bool(enable_webserver_doc_root, "true");
// Period to update rate counters and sampling counters in ms.
CONF_Int32(periodic_counter_update_period_ms, "500");

Expand Down Expand Up @@ -419,6 +411,9 @@ namespace config {
// same cache size configuration.
// TODO(cmy): use different config to set different client cache if necessary.
CONF_Int32(max_client_cache_size_per_host, "10");

// Dir to save files downloaded by SmallFileMgr
CONF_String(small_file_dir, "${DORIS_HOME}/lib/small_file/");
} // namespace config

} // namespace doris
Expand Down
1 change: 1 addition & 0 deletions be/src/runtime/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,7 @@ add_library(Runtime STATIC
routine_load/data_consumer_group.cpp
routine_load/data_consumer_pool.cpp
routine_load/routine_load_task_executor.cpp
small_file_mgr.cpp
)

# This test runs forever so should not be part of 'make test'
Expand Down
3 changes: 3 additions & 0 deletions be/src/runtime/exec_env.h
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ class TmpFileMgr;
class WebPageHandler;
class StreamLoadExecutor;
class RoutineLoadTaskExecutor;
class SmallFileMgr;

class BackendServiceClient;
class FrontendServiceClient;
Expand Down Expand Up @@ -112,6 +113,7 @@ class ExecEnv {
BufferPool* buffer_pool() { return _buffer_pool; }
TabletWriterMgr* tablet_writer_mgr() { return _tablet_writer_mgr; }
LoadStreamMgr* load_stream_mgr() { return _load_stream_mgr; }
SmallFileMgr* small_file_mgr() { return _small_file_mgr; }

const std::vector<StorePath>& store_paths() const { return _store_paths; }
void set_store_paths(const std::vector<StorePath>& paths) { _store_paths = paths; }
Expand Down Expand Up @@ -167,6 +169,7 @@ class ExecEnv {

StreamLoadExecutor* _stream_load_executor = nullptr;
RoutineLoadTaskExecutor* _routine_load_task_executor = nullptr;
SmallFileMgr* _small_file_mgr = nullptr;
};

}
Expand Down
3 changes: 3 additions & 0 deletions be/src/runtime/exec_env_init.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@
#include "runtime/routine_load/routine_load_task_executor.h"
#include "runtime/stream_load/load_stream_mgr.h"
#include "runtime/stream_load/stream_load_executor.h"
#include "runtime/small_file_mgr.h"
#include "util/pretty_printer.h"
#include "util/doris_metrics.h"
#include "util/brpc_stub_cache.h"
Expand Down Expand Up @@ -99,6 +100,7 @@ Status ExecEnv::_init(const std::vector<StorePath>& store_paths) {
_brpc_stub_cache = new BrpcStubCache();
_stream_load_executor = new StreamLoadExecutor(this);
_routine_load_task_executor = new RoutineLoadTaskExecutor(this);
_small_file_mgr = new SmallFileMgr(this, config::small_file_dir);

_backend_client_cache->init_metrics(DorisMetrics::metrics(), "backend");
_frontend_client_cache->init_metrics(DorisMetrics::metrics(), "frontend");
Expand All @@ -118,6 +120,7 @@ Status ExecEnv::_init(const std::vector<StorePath>& store_paths) {
exit(-1);
}
_broker_mgr->init();
_small_file_mgr->init();
_init_mem_tracker();
RETURN_IF_ERROR(_tablet_writer_mgr->start_bg_worker());
return Status::OK;
Expand Down
5 changes: 3 additions & 2 deletions be/src/runtime/pull_load_task_mgr.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -118,13 +118,13 @@ PullLoadTaskMgr::~PullLoadTaskMgr() {
Status PullLoadTaskMgr::init() {
auto st = load_task_ctxes();
if (!st.ok()) {
LOG(WARNING) << "Load task from directory failed. because " << st.get_error_msg();
_dir_exist = false;
}
return Status::OK;
}

Status PullLoadTaskMgr::load_task_ctxes() {
/*
// 1. scan all files
std::vector<std::string> files;
RETURN_IF_ERROR(FileUtils::scan_dir(_path, &files));
Expand All @@ -141,8 +141,9 @@ Status PullLoadTaskMgr::load_task_ctxes() {
<< ", status:" << status.get_error_msg();
}
}
*/

return Status::OK;
return Status("Not implemented");
}

Status PullLoadTaskMgr::load_task_ctx(const std::string& file_path) {
Expand Down
111 changes: 104 additions & 7 deletions be/src/runtime/routine_load/data_consumer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@

#include "common/status.h"
#include "service/backend_options.h"
#include "runtime/small_file_mgr.h"
#include "util/defer_op.h"
#include "util/stopwatch.hpp"
#include "util/uid_util.h"
Expand Down Expand Up @@ -52,9 +53,17 @@ Status KafkaDataConsumer::init(StreamLoadContext* ctx) {

std::string errstr;
auto set_conf = [&conf, &errstr](const std::string& conf_key, const std::string& conf_val) {
if (conf->set(conf_key, conf_val, errstr) != RdKafka::Conf::CONF_OK) {
RdKafka::Conf::ConfResult res = conf->set(conf_key, conf_val, errstr);
if (res == RdKafka::Conf::CONF_UNKNOWN) {
// ignore unknown config
return Status::OK;
} else if (errstr.find("not supported") != std::string::npos) {
// some java-only properties may be passed to here, and librdkafak will return 'xxx' not supported
// ignore it
return Status::OK;
} else if (res != RdKafka::Conf::CONF_OK) {
std::stringstream ss;
ss << "failed to set '" << conf_key << "'";
ss << "PAUSE: failed to set '" << conf_key << "', value: '" << conf_val << "', err: " << errstr;
LOG(WARNING) << ss.str();
return Status(ss.str());
}
Expand All @@ -73,21 +82,40 @@ Status KafkaDataConsumer::init(StreamLoadContext* ctx) {
RETURN_IF_ERROR(set_conf("api.version.fallback.ms", "0"));

for (auto& item : ctx->kafka_info->properties) {
RETURN_IF_ERROR(set_conf(item.first, item.second));
if (boost::algorithm::starts_with(item.second, "FILE:")) {
// file property should has format: FILE:file_id:md5
std::vector<std::string> parts;
boost::split(parts, item.second, boost::is_any_of(":"));
if (parts.size() != 3) {
return Status("PAUSE: Invalid file property of kafka: " + item.second);
}
int64_t file_id = std::stol(parts[1]);
std::string file_path;
Status st = ctx->exec_env()->small_file_mgr()->get_file(file_id, parts[2], &file_path);
if (!st.ok()) {
std::stringstream ss;
ss << "PAUSE: failed to get file for config: " << item.first << ", error: " << st.get_error_msg();
return Status(ss.str());
}
RETURN_IF_ERROR(set_conf(item.first, file_path));
} else {
RETURN_IF_ERROR(set_conf(item.first, item.second));
}
_custom_properties.emplace(item.first, item.second);
}

if (conf->set("event_cb", &_k_event_cb, errstr) != RdKafka::Conf::CONF_OK) {
std::stringstream ss;
ss << "failed to set 'event_cb'";
ss << "PAUSE: failed to set 'event_cb'";
LOG(WARNING) << ss.str();
return Status(ss.str());
}

// create consumer
_k_consumer = RdKafka::KafkaConsumer::create(conf, errstr);
if (!_k_consumer) {
LOG(WARNING) << "failed to create kafka consumer";
return Status("failed to create kafka consumer");
LOG(WARNING) << "PAUSE: failed to create kafka consumer: " << errstr;
return Status("PAUSE: failed to create kafka consumer: " + errstr);
}

VLOG(3) << "finished to init kafka consumer. " << ctx->brief();
Expand Down Expand Up @@ -174,7 +202,7 @@ Status KafkaDataConsumer::group_consume(
case RdKafka::ERR__TIMED_OUT:
// leave the status as OK, because this may happend
// if there is no data in kafka.
LOG(WARNING) << "kafka consume timeout: " << _id;
LOG(INFO) << "kafka consume timeout: " << _id;
break;
default:
LOG(WARNING) << "kafka consume failed: " << _id
Expand All @@ -199,6 +227,66 @@ Status KafkaDataConsumer::group_consume(
return st;
}

Status KafkaDataConsumer::get_partition_meta(std::vector<int32_t>* partition_ids) {
// create topic conf
RdKafka::Conf *tconf = RdKafka::Conf::create(RdKafka::Conf::CONF_TOPIC);
auto conf_deleter = [tconf] () { delete tconf; };
DeferOp delete_conf(std::bind<void>(conf_deleter));

// create topic
std::string errstr;
RdKafka::Topic *topic = RdKafka::Topic::create(_k_consumer, _topic, tconf, errstr);
if (topic == nullptr) {
std::stringstream ss;
ss << "failed to create topic: " << errstr;
LOG(WARNING) << ss.str();
return Status(ss.str());
}
auto topic_deleter = [topic] () { delete topic; };
DeferOp delete_topic(std::bind<void>(topic_deleter));

// get topic metadata
RdKafka::Metadata* metadata = nullptr;
RdKafka::ErrorCode err = _k_consumer->metadata(true/* for this topic */, topic, &metadata, 5000);
if (err != RdKafka::ERR_NO_ERROR) {
std::stringstream ss;
ss << "failed to get partition meta: " << RdKafka::err2str(err);
LOG(WARNING) << ss.str();
return Status(ss.str());
}
auto meta_deleter = [metadata] () { delete metadata; };
DeferOp delete_meta(std::bind<void>(meta_deleter));

// get partition ids
RdKafka::Metadata::TopicMetadataIterator it;
for (it = metadata->topics()->begin(); it != metadata->topics()->end(); ++it) {
if ((*it)->topic() != _topic) {
continue;
}

if ((*it)->err() != RdKafka::ERR_NO_ERROR) {
std::stringstream ss;
ss << "error: " << err2str((*it)->err());
if ((*it)->err() == RdKafka::ERR_LEADER_NOT_AVAILABLE) {
ss << ", try again";
}
LOG(WARNING) << ss.str();
return Status(ss.str());
}

RdKafka::TopicMetadata::PartitionMetadataIterator ip;
for (ip = (*it)->partitions()->begin(); ip != (*it)->partitions()->end(); ++ip) {
partition_ids->push_back((*ip)->id());
}
}

if (partition_ids->empty()) {
return Status("no partition in this topic");
}

return Status::OK;
}

Status KafkaDataConsumer::cancel(StreamLoadContext* ctx) {
std::unique_lock<std::mutex> l(_lock);
if (!_init) {
Expand All @@ -225,6 +313,15 @@ bool KafkaDataConsumer::match(StreamLoadContext* ctx) {
if (_brokers != ctx->kafka_info->brokers || _topic != ctx->kafka_info->topic) {
return false;
}
// check properties
if (_custom_properties.size() != ctx->kafka_info->properties.size()) {
return false;
}
for (auto& item : ctx->kafka_info->properties) {
if (_custom_properties.find(item.first) == _custom_properties.end()) {
return false;
}
}
return true;
}

Expand Down
5 changes: 5 additions & 0 deletions be/src/runtime/routine_load/data_consumer.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

#include <ctime>
#include <mutex>
#include <unordered_map>

#include "librdkafka/rdkafkacpp.h"

Expand Down Expand Up @@ -141,9 +142,13 @@ class KafkaDataConsumer : public DataConsumer {
// start the consumer and put msgs to queue
Status group_consume(BlockingQueue<RdKafka::Message*>* queue, int64_t max_running_time_ms);

// get the partitions ids of the topic
Status get_partition_meta(std::vector<int32_t>* partition_ids);

private:
std::string _brokers;
std::string _topic;
std::unordered_map<std::string, std::string> _custom_properties;

KafkaEventCb _k_event_cb;
RdKafka::KafkaConsumer* _k_consumer = nullptr;
Expand Down
4 changes: 2 additions & 2 deletions be/src/runtime/routine_load/data_consumer_pool.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ Status DataConsumerPool::get_consumer(
break;
default:
std::stringstream ss;
ss << "unknown routine load task type: " << ctx->load_type;
ss << "PAUSE: unknown routine load task type: " << ctx->load_type;
return Status(ss.str());
}

Expand All @@ -66,7 +66,7 @@ Status DataConsumerPool::get_consumer_grp(
StreamLoadContext* ctx,
std::shared_ptr<DataConsumerGroup>* ret) {
if (ctx->load_src_type != TLoadSourceType::KAFKA) {
return Status("Currently nly support consumer group for Kafka data source");
return Status("PAUSE: Currently only support consumer group for Kafka data source");
}
DCHECK(ctx->kafka_info);

Expand Down
Loading