Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support SSL authentication with Kafka in routine load job #1235

Merged
merged 36 commits into from
Jun 7, 2019
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
Show all changes
36 commits
Select commit Hold shift + click to select a range
117e405
add small file manager
morningman May 29, 2019
7098512
fix bug1
morningman May 29, 2019
cc989fd
fix bug2
morningman May 29, 2019
3b2ed78
fix bug3
morningman May 29, 2019
5315f3a
fix get log action
morningman May 29, 2019
e98aa5d
add small file write
morningman May 30, 2019
66e6e3f
fix download bug
morningman May 30, 2019
8e58645
fix too many abort tasks
morningman May 30, 2019
5bb8dec
add FILE:
morningman May 30, 2019
96e82f5
add GetSmallFileAction
morningman May 30, 2019
efb992a
register getSmallFileAction
morningman May 30, 2019
870db7c
no password
morningman May 30, 2019
dc6271a
add heartbeat master http port
morningman May 31, 2019
33a00f9
replay create file
morningman May 31, 2019
822d6e9
modify BE
morningman May 31, 2019
d5d6f0f
run ok
morningman Jun 1, 2019
816acb6
FE get partitions from BE
morningman Jun 1, 2019
e034cac
first ok
morningman Jun 1, 2019
961c4c6
FE works
morningman Jun 2, 2019
20f7e28
modify cancel load msg
morningman Jun 2, 2019
63f3f23
Add BE ut
morningman Jun 3, 2019
4aafe7b
add get_log_file doc
morningman Jun 3, 2019
5d72da0
Update docs/documentation/cn/administrator-guide/load-data/routine-lo…
morningman Jun 3, 2019
83261ed
Update docs/documentation/cn/administrator-guide/http-actions/fe_get_…
morningman Jun 3, 2019
5eeef55
Update fe/src/main/java/org/apache/doris/analysis/CreateFileStmt.java
morningman Jun 4, 2019
d1caa69
Update docs/help/Contents/Data Definition/ddl_stmt.md
morningman Jun 4, 2019
903d0e1
add url support for create file stmt
morningman Jun 4, 2019
d187499
fix bugs
morningman Jun 4, 2019
9fea318
fix bugs2
morningman Jun 4, 2019
d423a05
Update be/src/runtime/routine_load/data_consumer.cpp
morningman Jun 5, 2019
be2ce1d
Update be/src/runtime/routine_load/data_consumer.cpp
morningman Jun 5, 2019
2997f5f
modify by review 1
morningman Jun 5, 2019
ab70d3f
change the way to get small file
morningman Jun 5, 2019
96681e5
modify to way of using proto
morningman Jun 6, 2019
d3911b6
change routine load thread pool limit to 80
morningman Jun 6, 2019
ec563c9
remove saveTofile() in smallFileMgr
morningman Jun 7, 2019
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions be/src/runtime/routine_load/data_consumer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -236,7 +236,7 @@ Status KafkaDataConsumer::get_partition_meta(std::vector<int32_t>* partition_ids
// create topic
std::string errstr;
RdKafka::Topic *topic = RdKafka::Topic::create(_k_consumer, _topic, tconf, errstr);
if (!topic) {
if (topic == nullptr) {
std::stringstream ss;
ss << "failed to create topic: " << errstr;
LOG(WARNING) << ss.str();
Expand All @@ -246,7 +246,7 @@ Status KafkaDataConsumer::get_partition_meta(std::vector<int32_t>* partition_ids
DeferOp delete_topic(std::bind<void>(topic_deleter));

// get topic metadata
RdKafka::Metadata *metadata;
RdKafka::Metadata* metadata = nullptr;
RdKafka::ErrorCode err = _k_consumer->metadata(true/* for this topic */, topic, &metadata, 5000);
if (err != RdKafka::ERR_NO_ERROR) {
std::stringstream ss;
Expand Down
21 changes: 17 additions & 4 deletions be/src/runtime/routine_load/routine_load_task_executor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -34,15 +34,27 @@
namespace doris {

Status RoutineLoadTaskExecutor::get_kafka_partition_meta(
const TKafkaMetaProxyRequest& request, std::vector<int32_t>* partition_ids) {
DCHECK(request.__isset.kafka_info);
const PKafkaMetaProxyRequest& request, std::vector<int32_t>* partition_ids) {
DCHECK(request.has_kafka_info());

// This context is meaningless, just for unifing the interface
StreamLoadContext ctx(_exec_env);
ctx.load_type = TLoadType::ROUTINE_LOAD;
ctx.load_src_type = TLoadSourceType::KAFKA;
ctx.label = "NaN";
ctx.kafka_info = new KafkaLoadInfo(request.kafka_info);

// convert PKafkaInfo to TKafkaLoadInfo
TKafkaLoadInfo t_info;
t_info.brokers = request.kafka_info().brokers();
t_info.topic = request.kafka_info().topic();
std::map<std::string, std::string> properties;
for (int i = 0; i < request.kafka_info().properties_size(); ++i) {
const PStringPair& pair = request.kafka_info().properties(i);
properties.emplace(pair.key(), pair.val());
}
t_info.__set_properties(std::move(properties));

ctx.kafka_info = new KafkaLoadInfo(t_info);
ctx.need_rollback = false;

std::shared_ptr<DataConsumer> consumer;
Expand All @@ -63,7 +75,8 @@ Status RoutineLoadTaskExecutor::submit_task(const TRoutineLoadTask& task) {
return Status::OK;
}

if (_thread_pool.get_queue_size() > 100) {
// the max queue size of thread pool is 100, here we use 80 as a very conservative limit
if (_thread_pool.get_queue_size() >= 80) {
LOG(INFO) << "too many tasks in queue: " << _thread_pool.get_queue_size() << ", reject task: " << UniqueId(task.id);
return Status("too many tasks");
}
Expand Down
4 changes: 3 additions & 1 deletion be/src/runtime/routine_load/routine_load_task_executor.h
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@
#include "util/thread_pool.hpp"
#include "util/uid_util.h"

#include "gen_cpp/internal_service.pb.h"

namespace doris {

class ExecEnv;
Expand Down Expand Up @@ -56,7 +58,7 @@ class RoutineLoadTaskExecutor {
// submit a routine load task
Status submit_task(const TRoutineLoadTask& task);

Status get_kafka_partition_meta(const TKafkaMetaProxyRequest& request, std::vector<int32_t>* partition_ids);
Status get_kafka_partition_meta(const PKafkaMetaProxyRequest& request, std::vector<int32_t>* partition_ids);

private:
// execute the task
Expand Down
2 changes: 1 addition & 1 deletion be/src/runtime/small_file_mgr.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ Status SmallFileMgr::get_file(
Status st = _check_file(entry, md5);
if (!st.ok()) {
// check file failed, we should remove this cache and download it from FE again
if (!remove(entry.path.c_str())) {
if (remove(entry.path.c_str()) != 0) {
std::stringstream ss;
ss << "failed to remove file: " << file_id << ", err: "<< std::strerror(errno);
return Status(ss.str());
Expand Down
17 changes: 0 additions & 17 deletions be/src/service/backend_service.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -245,21 +245,4 @@ void BackendService::submit_routine_load_task(
return Status::OK.to_thrift(&t_status);
}

void BackendService::get_info(TProxyResult& result, const TProxyRequest& request) {
result.status.status_code = TStatusCode::OK;
if (request.__isset.kafka_meta_request) {
std::vector<int32_t> partition_ids;
Status st = _exec_env->routine_load_task_executor()->get_kafka_partition_meta(request.kafka_meta_request, &partition_ids);
if (!st.ok()) {
st.to_thrift(&result.status);
return;
} else {
TKafkaMetaProxyResult kafka_result;
kafka_result.__set_partition_ids(std::move(partition_ids));
result.__set_kafka_meta_result(std::move(kafka_result));
return;
}
}
}

} // namespace doris
3 changes: 0 additions & 3 deletions be/src/service/backend_service.h
Original file line number Diff line number Diff line change
Expand Up @@ -148,9 +148,6 @@ class BackendService : public BackendServiceIf {
virtual void get_tablet_stat(TTabletStatResult& result) override;

virtual void submit_routine_load_task(TStatus& t_status, const std::vector<TRoutineLoadTask>& tasks) override;

virtual void get_info(TProxyResult& result, const TProxyRequest& request) override;

private:
Status start_plan_fragment_execution(const TExecPlanFragmentParams& exec_params);

Expand Down
25 changes: 25 additions & 0 deletions be/src/service/internal_service.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
#include "util/thrift_util.h"
#include "runtime/buffer_control_block.h"
#include "runtime/result_buffer_mgr.h"
#include "runtime/routine_load/routine_load_task_executor.h"

namespace doris {

Expand Down Expand Up @@ -183,6 +184,30 @@ void PInternalServiceImpl<T>::trigger_profile_report(
st.to_protobuf(result->mutable_status());
}

template<typename T>
void PInternalServiceImpl<T>::get_info(
google::protobuf::RpcController* controller,
const PProxyRequest* request,
PProxyResult* response,
google::protobuf::Closure* done) {

brpc::ClosureGuard closure_guard(done);
if (request->has_kafka_meta_request()) {
std::vector<int32_t> partition_ids;
Status st = _exec_env->routine_load_task_executor()->get_kafka_partition_meta(request->kafka_meta_request(), &partition_ids);
if (st.ok()) {
PKafkaMetaProxyResult* kafka_result = response->mutable_kafka_meta_result();
for (int32_t id : partition_ids) {
kafka_result->add_partition_ids(id);
}
}
st.to_protobuf(response->mutable_status());
return;
}
Status::OK.to_protobuf(response->mutable_status());
}


template class PInternalServiceImpl<PBackendService>;
template class PInternalServiceImpl<palo::PInternalService>;

Expand Down
6 changes: 6 additions & 0 deletions be/src/service/internal_service.h
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,12 @@ class PInternalServiceImpl : public T {
PTriggerProfileReportResult* result,
google::protobuf::Closure* done) override;

void get_info(
google::protobuf::RpcController* controller,
const PProxyRequest* request,
PProxyResult* response,
google::protobuf::Closure* done) override;

private:
Status _exec_plan_fragment(brpc::Controller* cntl);
private:
Expand Down
27 changes: 24 additions & 3 deletions fe/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ under the License.
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<maven.compiler.source>1.8</maven.compiler.source>
<maven.compiler.target>1.8</maven.compiler.target>
<jprotobuf.version>2.2.11</jprotobuf.version>
</properties>

<profiles>
Expand Down Expand Up @@ -242,7 +243,8 @@ under the License.
<dependency>
<groupId>com.baidu</groupId>
<artifactId>jprotobuf</artifactId>
<version>1.11.0</version>
<version>${jprotobuf.version}</version>
<classifier>jar-with-dependencies</classifier>
</dependency>

<!-- https://mvnrepository.com/artifact/com.baidu/jprotobuf-rpc-common -->
Expand All @@ -256,7 +258,7 @@ under the License.
<dependency>
<groupId>com.baidu</groupId>
<artifactId>jprotobuf-rpc-core</artifactId>
<version>3.5.17</version>
<version>3.5.21</version>
</dependency>

<!-- https://mvnrepository.com/artifact/org.json/json -->
Expand Down Expand Up @@ -358,7 +360,7 @@ under the License.
<dependency>
<groupId>com.google.protobuf</groupId>
<artifactId>protobuf-java</artifactId>
<version>2.5.0</version>
<version>3.5.1</version>
</dependency>

<!-- https://mvnrepository.com/artifact/com.squareup/protoparser -->
Expand Down Expand Up @@ -576,6 +578,7 @@ under the License.
</plugin>

<!-- run make to generate Version and builtin -->
<!-- also parse the proto for FE -->
<plugin>
<groupId>org.codehaus.mojo</groupId>
<artifactId>exec-maven-plugin</artifactId>
Expand All @@ -596,6 +599,24 @@ under the License.
<skip>${skip.plugin}</skip>
</configuration>
</execution>
<execution>
<id>gen_proto</id>
<phase>generate-sources</phase>
<goals>
<!-- DO NOT use goal 'java', it will terminate the VM after done -->
<goal>exec</goal>
</goals>
<configuration>
<executable>java</executable>
<arguments>
<argument>-jar</argument>
<argument>${settings.localRepository}/com/baidu/jprotobuf/${jprotobuf.version}/jprotobuf-${jprotobuf.version}-jar-with-dependencies.jar</argument>
<argument>--java_out=${palo.home}/gensrc/build/java/</argument>
<argument>${palo.home}/gensrc/proto/internal_service.proto</argument>
</arguments>
<skip>${skip.plugin}</skip>
</configuration>
</execution>
</executions>
</plugin>

Expand Down
8 changes: 4 additions & 4 deletions fe/src/main/java/org/apache/doris/common/Status.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

package org.apache.doris.common;

import org.apache.doris.rpc.PStatus;
import org.apache.doris.proto.PStatus;
import org.apache.doris.thrift.TStatus;
import org.apache.doris.thrift.TStatusCode;

Expand Down Expand Up @@ -82,9 +82,9 @@ public void setStatus(String msg) {
}

public void setPstatus(PStatus status) {
this.errorCode = TStatusCode.findByValue(status.code);
if (status.msgs != null && !status.msgs.isEmpty()) {
this.errorMsg = status.msgs.get(0);
this.errorCode = TStatusCode.findByValue(status.status_code);
if (status.error_msgs != null && !status.error_msgs.isEmpty()) {
this.errorMsg = status.error_msgs.get(0);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,26 +17,31 @@

package org.apache.doris.common.proc;

import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import org.apache.doris.catalog.Catalog;
import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.Pair;
import org.apache.doris.common.util.Counter;
import org.apache.doris.common.util.DebugUtil;
import org.apache.doris.common.util.RuntimeProfile;
import org.apache.doris.proto.PTriggerProfileReportResult;
import org.apache.doris.proto.PUniqueId;
import org.apache.doris.qe.QueryStatisticsItem;
import org.apache.doris.rpc.*;
import org.apache.doris.rpc.BackendServiceProxy;
import org.apache.doris.rpc.PTriggerProfileReportRequest;
import org.apache.doris.rpc.RpcException;
import org.apache.doris.system.Backend;
import org.apache.doris.thrift.TNetworkAddress;
import org.apache.doris.thrift.TStatusCode;
import org.apache.doris.thrift.TUniqueId;

import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

import java.util.Collection;
import java.util.Formatter;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutionException;
Expand Down Expand Up @@ -198,7 +203,9 @@ private void triggerProfileReport(Collection<QueryStatisticsItem> items, boolean
}
// specified query instance which will report.
if (!allQuery) {
final PUniqueId pUId = new PUniqueId(instanceInfo.getInstanceId());
final PUniqueId pUId = new PUniqueId();
pUId.hi = instanceInfo.getInstanceId().hi;
pUId.lo = instanceInfo.getInstanceId().lo;
request.addInstanceId(pUId);
}
}
Expand Down Expand Up @@ -230,11 +237,11 @@ private void recvResponse(List<Pair<Request, Future<PTriggerProfileReportResult>
try {
final PTriggerProfileReportResult result
= pair.second.get(2, TimeUnit.SECONDS);
final TStatusCode code = TStatusCode.findByValue(result.status.code);
final TStatusCode code = TStatusCode.findByValue(result.status.status_code);
if (code != TStatusCode.OK) {
String errMsg = "";
if (result.status.msgs != null && !result.status.msgs.isEmpty()) {
errMsg = result.status.msgs.get(0);
if (result.status.error_msgs != null && !result.status.error_msgs.isEmpty()) {
errMsg = result.status.error_msgs.get(0);
}
throw new AnalysisException(reasonPrefix + " backend:" + pair.first.getAddress()
+ " reason:" + errMsg);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
package org.apache.doris.common.util;

import org.apache.doris.common.Pair;
import org.apache.doris.rpc.PUniqueId;
import org.apache.doris.proto.PUniqueId;
import org.apache.doris.thrift.TUniqueId;

import java.io.PrintWriter;
Expand Down
Loading