Skip to content

Commit

Permalink
feat: run and explain interface rfc
Browse files Browse the repository at this point in the history
  • Loading branch information
jingchen2222 committed Aug 14, 2021
1 parent da4c50f commit 29e2680
Show file tree
Hide file tree
Showing 17 changed files with 118 additions and 138 deletions.
3 changes: 0 additions & 3 deletions .github/workflows/cicd.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,6 @@ jobs:
- name: cleanup
if: always()
run: |
source /etc/profile.d/enable-rh.sh
git clean -dfx
sql_sdk_test:
Expand All @@ -90,7 +89,6 @@ jobs:
- name: cleanup
if: always()
run: |
source /etc/profile.d/enable-rh.sh
git clean -dfx
sql_cluster_test:
Expand All @@ -117,7 +115,6 @@ jobs:
- name: cleanup
if: always()
run: |
source /etc/profile.d/enable-rh.sh
git clean -dfx
publish-test-results:
Expand Down
4 changes: 2 additions & 2 deletions hybridse/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,10 @@
## [Unreleased]

### SQL Syntax
- Support parameterized query [#170](https://github.com/4paradigm/HybridSE/issues/170)
- Support parameterized query under BatchMode [#170](https://github.com/4paradigm/HybridSE/issues/170)
- `nvl` & `nvl2`: [#190](https://github.com/4paradigm/HybridSE/pull/190)

## [0.2.1] - 2021-08-08
## [0.2.1] - 2021-08-06w
### Feature
+ Add `VARCHAR` Type [#196](https://github.com/4paradigm/HybridSE/issues/196)

Expand Down
4 changes: 1 addition & 3 deletions hybridse/examples/toydb/src/tablet/tablet_server_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -203,7 +203,6 @@ void TabletServerImpl::Query(RpcController* ctrl, const QueryRequest* request,
return;
}
vm::RequestRunSession session;
session.SetParameterSchema(request->parameter_schema());
{
base::Status base_status;
bool ok = engine_->Get(request->sql(), request->db(), session,
Expand All @@ -218,10 +217,9 @@ void TabletServerImpl::Query(RpcController* ctrl, const QueryRequest* request,
if (request->is_debug()) {
session.EnableDebug();
}
codec::Row parameter(request->parameter_row());
codec::Row row(request->row());
codec::Row output;
int32_t ret = session.Run(request->task_id(), row, parameter, &output);
int32_t ret = session.Run(request->task_id(), row, &output);
if (ret != 0) {
LOG(WARNING) << "fail to run sql " << request->sql();
status->set_code(common::kSqlError);
Expand Down
46 changes: 26 additions & 20 deletions hybridse/include/vm/engine.h
Original file line number Diff line number Diff line change
Expand Up @@ -178,17 +178,12 @@ class RunSession {

/// Bind this run session with specific procedure
void SetSpName(const std::string& sp_name) { sp_name_ = sp_name; }
/// Bing the run session with specific parameter schema
void SetParameterSchema(const codec::Schema& schema) { parameter_schema_ = schema; }
/// Return query parameter schema.
virtual const Schema& GetParameterSchema() const { return parameter_schema_; }
/// Return the engine mode of this run session
EngineMode engine_mode() const { return engine_mode_; }

protected:
std::shared_ptr<hybridse::vm::CompileInfo> compile_info_;
hybridse::vm::EngineMode engine_mode_;
codec::Schema parameter_schema_;
bool is_debug_;
std::string sp_name_;
friend Engine;
Expand All @@ -198,18 +193,24 @@ class RunSession {
class BatchRunSession : public RunSession {
public:
explicit BatchRunSession(bool mini_batch = false)
: RunSession(kBatchMode), mini_batch_(mini_batch) {}
: RunSession(kBatchMode), parameter_schema_() {}
~BatchRunSession() {}
/// \brief Query sql with parameter row in batch mode.
/// Query results will be returned as std::vector<Row> in output
int32_t Run(const Row& parameter_row, std::vector<Row>& output, // NOLINT
uint64_t limit = 0);
/// \brief Query sql in batch mode.
/// Return query result as TableHandler pointer.
std::shared_ptr<TableHandler> Run();
/// \brief Parameterized Query sql in batch mode.
/// Return query result as TableHandler pointer.
std::shared_ptr<TableHandler> Run(const Row& parameter_row);

/// Bing the run session with specific parameter schema
void SetParameterSchema(const codec::Schema& schema) { parameter_schema_ = schema; }
/// Return query parameter schema.
virtual const Schema& GetParameterSchema() const { return parameter_schema_; }
private:
const bool mini_batch_;
codec::Schema parameter_schema_;
};
/// \brief RequestRunSession is a kind of RunSession designed for request mode query.
///
Expand All @@ -223,17 +224,15 @@ class RequestRunSession : public RunSession {
/// \param in_row request row
/// \param output query result will be returned as Row in output
/// \return `0` if run successfully else negative integer
int32_t Run(const Row& in_row, const Row& parameter_row,
Row* output); // NOLINT
int32_t Run(const Row& in_row, Row* output); // NOLINT

/// \brief Run a task specified by task_id in request mode.
///
/// \param task_id: task id of task
/// \param in_row: request row
/// \param[out] output: result is written to this variable
/// \return `0` if run successfully else negative integer
int32_t Run(uint32_t task_id, const Row& in_row, const Row& parameter_row,
Row* output); // NOLINT
int32_t Run(uint32_t task_id, const Row& in_row, Row* output); // NOLINT

/// \brief Return the schema of request row
virtual const Schema& GetRequestSchema() const {
Expand Down Expand Up @@ -266,16 +265,14 @@ class BatchRequestRunSession : public RunSession {
/// \param request_batch: a batch of request rows
/// \param output: query results will be returned as std::vector<Row> in output
/// \return 0 if runs successfully else negative integer
int32_t Run(const std::vector<Row>& request_batch, const Row& parameter_row,
std::vector<Row>& output); // NOLINT
int32_t Run(const std::vector<Row>& request_batch, std::vector<Row>& output); // NOLINT

/// \brief Run a task specified by task_id in request mode.
/// \param id: id of task
/// \param request_batch: a batch of request rows
/// \param output: query results will be returned as std::vector<Row> in output
/// \return 0 if runs successfully else negative integer
int32_t Run(const uint32_t id, const std::vector<Row>& request_batch, const Row& parameter_row,
std::vector<Row>& output); // NOLINT
int32_t Run(const uint32_t id, const std::vector<Row>& request_batch, std::vector<Row>& output); // NOLINT

/// \brief Add common column idx
void AddCommonColumnIdx(size_t idx) { common_column_indices_.insert(idx); }
Expand Down Expand Up @@ -349,6 +346,13 @@ class Engine {
/// The results are returned as ExplainOutput in explain_output.
/// The success or fail status message is returned as Status in status.
/// TODO: base::Status* status -> base::Status& status
bool Explain(const std::string& sql, const std::string& db,
EngineMode engine_mode, ExplainOutput* explain_output, base::Status* status);
/// \brief Explain sql compiling result.
///
/// The results are returned as ExplainOutput in explain_output.
/// The success or fail status message is returned as Status in status.
/// TODO: base::Status* status -> base::Status& status
bool Explain(const std::string& sql, const std::string& db,
EngineMode engine_mode, const codec::Schema& parameter_schema,
ExplainOutput* explain_output,
Expand All @@ -357,10 +361,8 @@ class Engine {
/// \brief Same as above, but allowing compiling with configuring common column indices.
///
/// The common column indices are used for common column optimization under EngineMode::kBatchRequestMode
bool Explain(const std::string& sql, const std::string& db,
EngineMode engine_mode, const codec::Schema& parameter_schema,
const std::set<size_t>& common_column_indices,
ExplainOutput* explain_output, base::Status* status);
bool Explain(const std::string& sql, const std::string& db, EngineMode engine_mode,
const std::set<size_t>& common_column_indices, ExplainOutput* explain_output, base::Status* status);

/// \brief Update engine's catalog
inline void UpdateCatalog(std::shared_ptr<Catalog> cl) {
Expand All @@ -384,6 +386,10 @@ class Engine {
std::shared_ptr<CompileInfo> info,
base::Status& status); // NOLINT

bool Explain(const std::string& sql, const std::string& db,
EngineMode engine_mode, const codec::Schema& parameter_schema,
const std::set<size_t>& common_column_indices,
ExplainOutput* explain_output, base::Status* status);
std::shared_ptr<Catalog> cl_;
EngineOptions options_;
base::SpinMutex mu_;
Expand Down
1 change: 1 addition & 0 deletions hybridse/src/proto/fe_common.proto
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ option java_outer_classname = "Common";
enum StatusCode {
kOk = 0;
kRunning = 1;
kRunError = 2;
kTypeError = 8;
kFileIOError = 9;
kUnSupport = 10;
Expand Down
7 changes: 6 additions & 1 deletion hybridse/src/testing/engine_test_base.cc
Original file line number Diff line number Diff line change
Expand Up @@ -379,7 +379,12 @@ Status EngineTestRunner::Compile() {
if (hybridse::sqlcase::SqlCase::IsDebug() || sql_case_.debug()) {
session_->EnableDebug();
}
session_->SetParameterSchema(parameter_schema_);
if (session_->engine_mode() == kBatchMode) {
dynamic_pointer_cast<BatchRunSession>(session_)->SetParameterSchema(parameter_schema_);
} else {
CHECK_TRUE(parameter_schema_.empty(), common::kUnSupport,
"Request or BatchRequest mode do not support parameterized query currently")
}
struct timeval st;
struct timeval et;
gettimeofday(&st, nullptr);
Expand Down
11 changes: 7 additions & 4 deletions hybridse/src/testing/engine_test_base.h
Original file line number Diff line number Diff line change
Expand Up @@ -262,13 +262,14 @@ class RequestEngineTestRunner : public EngineTestRunner {
auto request_session =
std::dynamic_pointer_cast<RequestRunSession>(session_);
std::string request_name = request_session->GetRequestName();
CHECK_TRUE(parameter_rows_.empty(), common::kUnSupport, "Request do not support parameterized query currently")
Row parameter = parameter_rows_.empty() ? Row() : parameter_rows_[0];
for (auto in_row : request_rows_) {
Row out_row;
int run_ret = request_session->Run(in_row, parameter, &out_row);
int run_ret = request_session->Run(in_row, &out_row);
if (run_ret != 0) {
return_code_ = ENGINE_TEST_RET_EXECUTION_ERROR;
return Status(kSqlError, "Run request session failed");
return Status(common::kRunError, "Run request session failed");
}
if (!has_batch_request) {
CHECK_TRUE(AddRowIntoTable(request_name, in_row), kSqlError,
Expand Down Expand Up @@ -417,8 +418,10 @@ class BatchRequestEngineTestRunner : public EngineTestRunner {
auto request_session =
std::dynamic_pointer_cast<BatchRequestRunSession>(session_);
CHECK_TRUE(request_session != nullptr, common::kSqlError);
Row parameter = parameter_rows_.empty() ? Row() : parameter_rows_[0];
int run_ret = request_session->Run(request_rows_, parameter, *outputs);
// Currently parameterized query un-support currently
CHECK_TRUE(parameter_rows_.empty(), common::kUnSupport,
"Batch request do not support parameterized query currently")
int run_ret = request_session->Run(request_rows_, *outputs);
if (run_ret != 0) {
return_code_ = ENGINE_TEST_RET_EXECUTION_ERROR;
return Status(kSqlError, "Run batch request session failed");
Expand Down
69 changes: 44 additions & 25 deletions hybridse/src/vm/engine.cc
Original file line number Diff line number Diff line change
Expand Up @@ -148,19 +148,22 @@ bool Engine::IsCompatibleCache(RunSession& session, // NOLINT
return false;
}
auto& cache_ctx = std::dynamic_pointer_cast<SqlCompileInfo>(info)->get_sql_context();
if (cache_ctx.parameter_types.size() != session.parameter_schema_.size()) {
status = Status(common::kSqlError, "Inconsistent cache parameter schema size");
return false;
}
for (int i = 0; i < session.parameter_schema_.size(); i++) {
if (cache_ctx.parameter_types.Get(i).type() != session.GetParameterSchema().Get(i).type()) {
status = Status(common::kSqlError, "Inconsistent cache parameter type, expect " +
session.GetParameterSchema().Get(i).DebugString() + " but get " +
cache_ctx.parameter_types.Get(i).DebugString());

if (session.engine_mode() == kBatchMode) {
auto batch_sess = dynamic_cast<BatchRunSession*>(&session);
if (cache_ctx.parameter_types.size() != batch_sess->GetParameterSchema().size()) {
status = Status(common::kSqlError, "Inconsistent cache parameter schema size");
return false;
}
}
if (session.engine_mode() == kBatchRequestMode) {
for (int i = 0; i < batch_sess->GetParameterSchema().size(); i++) {
if (cache_ctx.parameter_types.Get(i).type() != batch_sess->GetParameterSchema().Get(i).type()) {
status = Status(common::kSqlError, "Inconsistent cache parameter type, expect " +
batch_sess->GetParameterSchema().Get(i).DebugString() +
" but get " + cache_ctx.parameter_types.Get(i).DebugString());
return false;
}
}
} else if (session.engine_mode() == kBatchRequestMode) {
auto batch_req_sess = dynamic_cast<BatchRequestRunSession*>(&session);
if (batch_req_sess == nullptr) {
return false;
Expand Down Expand Up @@ -200,10 +203,10 @@ bool Engine::Get(const std::string& sql, const std::string& db, RunSession& sess
sql_context.enable_batch_window_parallelization = options_.is_enable_batch_window_parallelization();
sql_context.enable_expr_optimize = options_.is_enable_expr_optimize();
sql_context.jit_options = options_.jit_options();
sql_context.parameter_types = session.parameter_schema_;

auto batch_req_sess = dynamic_cast<BatchRequestRunSession*>(&session);
if (batch_req_sess) {
if (session.engine_mode() == kBatchMode) {
sql_context.parameter_types = dynamic_cast<BatchRunSession*>(&session)->GetParameterSchema();
} else if (session.engine_mode() == kBatchRequestMode) {
auto batch_req_sess = dynamic_cast<BatchRequestRunSession*>(&session);
sql_context.batch_request_info.common_column_indices = batch_req_sess->common_column_indices();
}

Expand Down Expand Up @@ -248,6 +251,10 @@ bool Engine::Explain(const std::string& sql, const std::string& db, EngineMode e
LOG(WARNING) << "common column config can only be valid in batch request mode";
return false;
}
if (!parameter_schema.empty() && engine_mode != kBatchMode) {
LOG(WARNING) << "parameterized query can only be valid in batch mode";
return false;
}
SqlContext ctx;
ctx.engine_mode = engine_mode;
ctx.sql = sql;
Expand Down Expand Up @@ -300,11 +307,22 @@ bool Engine::Explain(const std::string& sql, const std::string& db, EngineMode e
}
return true;
}
bool Engine::Explain(const std::string& sql, const std::string& db, EngineMode engine_mode,
ExplainOutput* explain_output, base::Status* status) {
const codec::Schema empty_schema;
return Explain(sql, db, engine_mode, empty_schema, {}, explain_output, status);
}

bool Engine::Explain(const std::string& sql, const std::string& db, EngineMode engine_mode,
const codec::Schema& parameter_schema, ExplainOutput* explain_output, base::Status* status) {
return Explain(sql, db, engine_mode, parameter_schema, {}, explain_output, status);
}
bool Engine::Explain(const std::string& sql, const std::string& db, EngineMode engine_mode,
const std::set<size_t>& common_column_indices,
ExplainOutput* explain_output, base::Status* status) {
const codec::Schema empty_schema;
return Explain(sql, db, engine_mode, empty_schema, common_column_indices, explain_output, status);
}

void Engine::ClearCacheLocked(const std::string& db) {
std::lock_guard<base::SpinMutex> lock(mu_);
Expand Down Expand Up @@ -363,12 +381,12 @@ bool RunSession::SetCompileInfo(const std::shared_ptr<CompileInfo>& compile_info
return true;
}

int32_t RequestRunSession::Run(const Row& in_row, const Row& parameter_row, Row* out_row) {
int32_t RequestRunSession::Run(const Row& in_row, Row* out_row) {
DLOG(INFO) << "Request Row Run with main task";
return Run(std::dynamic_pointer_cast<SqlCompileInfo>(compile_info_)->get_sql_context().cluster_job.main_task_id(),
in_row, parameter_row, out_row);
in_row, out_row);
}
int32_t RequestRunSession::Run(const uint32_t task_id, const Row& in_row, const Row& parameter_row, Row* out_row) {
int32_t RequestRunSession::Run(const uint32_t task_id, const Row& in_row, Row* out_row) {
auto task = std::dynamic_pointer_cast<SqlCompileInfo>(compile_info_)
->get_sql_context()
.cluster_job.GetTask(task_id)
Expand All @@ -379,7 +397,7 @@ int32_t RequestRunSession::Run(const uint32_t task_id, const Row& in_row, const
}
DLOG(INFO) << "Request Row Run with task_id " << task_id;
RunnerContext ctx(&std::dynamic_pointer_cast<SqlCompileInfo>(compile_info_)->get_sql_context().cluster_job, in_row,
parameter_row, sp_name_, is_debug_);
sp_name_, is_debug_);
auto output = task->RunWithCache(ctx);
if (!output) {
LOG(WARNING) << "run request plan output is null";
Expand All @@ -392,15 +410,14 @@ int32_t RequestRunSession::Run(const uint32_t task_id, const Row& in_row, const
return -1;
}

int32_t BatchRequestRunSession::Run(const std::vector<Row>& request_batch, const Row& parameter_row,
std::vector<Row>& output) {
int32_t BatchRequestRunSession::Run(const std::vector<Row>& request_batch, std::vector<Row>& output) {
return Run(std::dynamic_pointer_cast<SqlCompileInfo>(compile_info_)->get_sql_context().cluster_job.main_task_id(),
request_batch, parameter_row, output);
request_batch, output);
}
int32_t BatchRequestRunSession::Run(const uint32_t id, const std::vector<Row>& request_batch, const Row& parameter_row,
int32_t BatchRequestRunSession::Run(const uint32_t id, const std::vector<Row>& request_batch,
std::vector<Row>& output) {
RunnerContext ctx(&std::dynamic_pointer_cast<SqlCompileInfo>(compile_info_)->get_sql_context().cluster_job,
request_batch, parameter_row, sp_name_, is_debug_);
request_batch, sp_name_, is_debug_);
auto task =
std::dynamic_pointer_cast<SqlCompileInfo>(compile_info_)->get_sql_context().cluster_job.GetTask(id).GetRoot();
if (nullptr == task) {
Expand All @@ -419,7 +436,9 @@ int32_t BatchRequestRunSession::Run(const uint32_t id, const std::vector<Row>& r
ctx.ClearCache();
return 0;
}

std::shared_ptr<TableHandler> BatchRunSession::Run() {
return Run(Row());
}
std::shared_ptr<TableHandler> BatchRunSession::Run(const Row& parameter_row) {
RunnerContext ctx(&std::dynamic_pointer_cast<SqlCompileInfo>(compile_info_)->get_sql_context().cluster_job,
parameter_row, is_debug_);
Expand Down
2 changes: 0 additions & 2 deletions hybridse/src/vm/engine_compile_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -382,7 +382,6 @@ TEST_F(EngineCompileTest, ExplainBatchRequestTest) {
AddTable(db, table_def);
catalog->AddDatabase(db);

codec::Schema empty_parameter_schema;
std::set<size_t> common_column_indices({2, 3, 5});
std::string sql =
"select col0, col1, col2, sum(col1) over w1, \n"
Expand All @@ -396,7 +395,6 @@ TEST_F(EngineCompileTest, ExplainBatchRequestTest) {
ExplainOutput explain_output;
base::Status status;
ASSERT_TRUE(engine.Explain(sql, "simple_db", kBatchRequestMode,
empty_parameter_schema,
common_column_indices, &explain_output,
&status));
ASSERT_TRUE(status.isOK()) << status;
Expand Down
Loading

0 comments on commit 29e2680

Please sign in to comment.