diff --git a/.github/workflows/cicd.yaml b/.github/workflows/cicd.yaml index b4a174aa581..f5951d35d18 100644 --- a/.github/workflows/cicd.yaml +++ b/.github/workflows/cicd.yaml @@ -11,6 +11,7 @@ on: env: GIT_SUBMODULE_STRATEGY: recursive + HYBRIDSE_SOURCE: download jobs: build_and_cpp_ut: @@ -23,7 +24,7 @@ jobs: - uses: actions/checkout@v1 - name: build run: | - bash steps/init_env.sh + bash steps/init_env.sh ${{ env.HYBRIDSE_SOURCE }} mkdir -p build source /root/.bashrc && cd build && cmake -DSQL_PYSDK_ENABLE=OFF -DSQL_JAVASDK_ENABLE=OFF -DTESTING_ENABLE=ON .. && make -j$(nproc) && cd ../ rm -rf thirdparty @@ -65,7 +66,7 @@ jobs: - uses: actions/checkout@v1 - name: build run: | - bash steps/init_env.sh + bash steps/init_env.sh ${{ env.HYBRIDSE_SOURCE }} mkdir -p build source /root/.bashrc && cd build && cmake -DSQL_PYSDK_ENABLE=OFF -DSQL_JAVASDK_ENABLE=OFF -DTESTING_ENABLE=ON .. && make -j$(nproc) sql_sdk_test && cd ../ - name: run sql_sdk_test @@ -87,7 +88,7 @@ jobs: - uses: actions/checkout@v1 - name: build run: | - bash steps/init_env.sh + bash steps/init_env.sh ${{ env.HYBRIDSE_SOURCE }} mkdir -p build source /root/.bashrc && cd build && cmake -DSQL_PYSDK_ENABLE=OFF -DSQL_JAVASDK_ENABLE=OFF -DTESTING_ENABLE=ON .. && make -j$(nproc) sql_cluster_test && cd ../ - name: run sql_sdk_test @@ -110,7 +111,7 @@ jobs: - name: build jsdk and package run: | - bash steps/init_env.sh + bash steps/init_env.sh ${{ env.HYBRIDSE_SOURCE }} mkdir -p build source /root/.bashrc && cd build && cmake -DSQL_PYSDK_ENABLE=OFF -DSQL_JAVASDK_ENABLE=ON -DTESTING_ENABLE=OFF .. && make -j$(nproc) sql_javasdk_package openmldb && cd ../ @@ -140,7 +141,7 @@ jobs: - uses: actions/checkout@v1 - name: build pysdk and sqlalchemy run: | - bash steps/init_env.sh + bash steps/init_env.sh ${{ env.HYBRIDSE_SOURCE }} mkdir -p build source /root/.bashrc && cd build && cmake -DSQL_PYSDK_ENABLE=ON -DSQL_JAVASDK_ENABLE=OFF -DTESTING_ENABLE=OFF .. && make -j$(nproc) sqlalchemy_openmldb openmldb && cd ../ - name: test sqlalchemy @@ -230,7 +231,7 @@ jobs: - name: upload to maven run: | - bash steps/init_env.sh + bash steps/init_env.sh ${{ env.HYBRIDSE_SOURCE }} VERSION=$(echo "${{ github.ref }}" | sed -e 's,.*/\(.*\),\1,') VERSION=$(echo $VERSION | sed -e 's/^v//') sh steps/package_openmldb_javasdk.sh $VERSION diff --git a/.github/workflows/hybridse-ci.yml b/.github/workflows/hybridse-ci.yml index be673888b87..217a6f1566a 100644 --- a/.github/workflows/hybridse-ci.yml +++ b/.github/workflows/hybridse-ci.yml @@ -5,7 +5,7 @@ on: branches: - main tags: - - v* + - hybridse-v* pull_request: paths: - .github/workflows/hybridse-ci.yml @@ -86,10 +86,10 @@ jobs: name: lib-artifacts - name: Prepare Maven Release Deploy - if: startsWith(github.ref, 'refs/tags/v') + if: startsWith(github.ref, 'refs/tags/hybridse-v') run: | VERSION=$(echo "${{ github.ref }}" | sed -e 's,.*/\(.*\),\1,') - VERSION=$(echo $VERSION | sed -e 's/^v//') + VERSION=$(echo $VERSION | sed -e 's/^hybridse-v//') ./java/prepare_release.sh $VERSION - name: Publish Java Library @@ -106,9 +106,9 @@ jobs: - name: Create Archive if: ${{ github.event_name == 'push' }} run: | - if [[ "${{ github.ref }}" == "refs/tags/v"* ]]; then + if [[ "${{ github.ref }}" == "refs/tags/hybridse-v"* ]]; then VERSION=$(echo "${{ github.ref }}" | sed -e 's,.*/\(.*\),\1,') - VERSION=$(echo $VERSION | sed -e 's/^v//') + VERSION=$(echo $VERSION | sed -e 's/^hybridse-v//') HYBRIDSE_VERSION=$VERSION ./tools/hybridse_deploy.sh else ./tools/hybridse_deploy.sh @@ -188,10 +188,10 @@ jobs: name: lib-artifacts - name: Prepare Maven Release Deploy - if: startsWith(github.ref, 'refs/tags/v') + if: startsWith(github.ref, 'refs/tags/hybridse-v') run: | VERSION=$(echo "${{ github.ref }}" | sed -e 's,.*/\(.*\),\1,') - VERSION=$(echo $VERSION | sed -e 's/^v//') + VERSION=$(echo $VERSION | sed -e 's/^hybridse-v//') ./java/prepare_release.sh $VERSION - name: Publish Java Library @@ -209,9 +209,9 @@ jobs: - name: Create Archive if: ${{ github.event_name == 'push' }} run: | - if [[ "${{ github.ref }}" == "refs/tags/v"* ]]; then + if [[ "${{ github.ref }}" == "refs/tags/hybridse-v"* ]]; then VERSION=$(echo "${{ github.ref }}" | sed -e 's,.*/\(.*\),\1,') - VERSION=$(echo $VERSION | sed -e 's/^v//') + VERSION=$(echo $VERSION | sed -e 's/^hybridse-v//') HYBRIDSE_VERSION=$VERSION ./tools/hybridse_deploy.sh else ./tools/hybridse_deploy.sh @@ -267,10 +267,10 @@ jobs: cp libhybridse_*.dylib java/hybridse-native/src/main/resources/ - name: Prepare Maven Release Deploy - if: startsWith(github.ref, 'refs/tags/v') + if: startsWith(github.ref, 'refs/tags/hybridse-v') run: | VERSION=$(echo "${{ github.ref }}" | sed -e 's,.*/\(.*\),\1,') - VERSION=$(echo $VERSION | sed -e 's/^v//') + VERSION=$(echo $VERSION | sed -e 's/^hybridse-v//') ./java/prepare_release.sh $VERSION - name: Publish Java Library @@ -289,7 +289,7 @@ jobs: runs-on: ubuntu-latest needs: ["linux-build", "macos-build"] if: > - success() && startsWith(github.ref, 'refs/tags/v') + success() && startsWith(github.ref, 'refs/tags/hybridse-v') steps: - name: Download Release Artifacts uses: actions/download-artifact@v2 @@ -297,7 +297,7 @@ jobs: name: release-artifacts - name: Release - if: ${{ startsWith(github.ref, 'refs/tags/v') }} + if: ${{ startsWith(github.ref, 'refs/tags/hybridse-v') }} uses: softprops/action-gh-release@v1 with: files: | diff --git a/CHANGELOG.md b/CHANGELOG.md new file mode 100644 index 00000000000..a0a832a7f5d --- /dev/null +++ b/CHANGELOG.md @@ -0,0 +1,47 @@ +# Changelog + +## [Unreleased] + +### Feature +- Support parameterized query under BatchMode [#262](https://github.com/4paradigm/OpenMLDB/issues/262), [#168](https://github.com/4paradigm/OpenMLDB/issues/168) + +### SQL Syntax +- `nvl` & `nvl2`: [#238](https://github.com/4paradigm/OpenMLDB/issues/238) +- bitwise operators: `&`, `|`, `^`, `~` [#244](https://github.com/4paradigm/OpenMLDB/pull/244) + +## [0.2.2] - 2021-08-08 +### Feature ++ Add `VARCHAR` Type [#237](https://github.com/4paradigm/OpenMLDB/issues/237) + +### Bug Fix +- Fix invalid back qoute identifier name [#263](https://github.com/4paradigm/OpenMLDB/issues/263) + can't write as multiple path style (e.g a.b) now +- InsertPreparedStatement set month by mistake when use Date type [#200](https://github.com/4paradigm/OpenMLDB/pull/200) + +### Note: +`OPTIONS` can't write as multiple path style (e.g a.b) now + +## [0.2.0] - 2021-07-22 +### Features + ++ Refactor front-end using [zetasql](https://github.com/jingchen2222/zetasql). Thus OpenMLDB can support more SQL syntaxs and provide friendly syntax error message. ++ Better code style and comment ++ Add APIServer module. User can use Rest API access OpenMLDB.[#48](https://github.com/4paradigm/OpenMLDB/issues/48) + +### SQL Syntax + +Changed +- `table options` syntax: [#103](https://github.com/4paradigm/HybridSE/issues/103) +- `lead` method: [#136](https://github.com/4paradigm/HybridSE/pull/136) + +Removed +- `||` and `&&` as logical operator: [#264](https://github.com/4paradigm/OpenMLDB/issues/264) +- `at` function: [#265](https://github.com/4paradigm/OpenMLDB/issues/265) + +### Note +- openmldb-0.2.0-linux.tar.gz targets on x86_64 +- aarch64 artifacts consider experimental + +[Unreleased]: https://github.com/4paradigm/OpenMLDB/compare/0.2.2...HEAD +[0.2.2]: https://github.com/4paradigm/OpenMLDB/compare/v0.2.0...0.2.2 +[0.2.0]: https://github.com/4paradigm/OpenMLDB/compare/v0.1.5-pre...v0.2.0 diff --git a/cases/function/expression/test_arithmetic.yaml b/cases/function/expression/test_arithmetic.yaml index b3a9c952d5a..13627c7d732 100644 --- a/cases/function/expression/test_arithmetic.yaml +++ b/cases/function/expression/test_arithmetic.yaml @@ -595,7 +595,6 @@ cases: NULL, NULL, NULL, NULL, NULL] - id: bitwise_operators desc: bitwise and/or/xor - mode: hybridse-only inputs: - columns: ["c1 int16","c2 int32","c3 bigint", "c6 timestamp"] indexs: ["index1:c3:c6"] @@ -619,14 +618,13 @@ cases: - [ 0, 5, 15, 0, 10, 0 ] - id: bitwise_operators_fail desc: bitwise and/or/xor, fail on non-integral operands - mode: hybridse-only inputs: - columns: [ "c0 int", "c1 bool", "c2 float", "c3 double", "c4 string", "c5 date", "c6 timestamp" ] indexs: ["index1:c0:c6"] rows: - [1, true, 1.0, 2.0, "abc", "2012-8-11", 1590738989000] sql: | - select d[1] d[0] 10 as r1 from {0} + select d[1] d[0] 10 as r1 from {0}; dataProvider: - ['&', '|', '^'] - [ '{0}.c1', '{0}.c2', '{0}.c3', '{0}.c4', '{0}.c5', '{0}.c6' ] @@ -634,28 +632,26 @@ cases: success: false - id: bitwise_operators_not desc: bitwise not - mode: hybridse-only inputs: - columns: ["c1 int16","c2 int32","c3 bigint", "c6 timestamp"] indexs: ["index1:c3:c6"] rows: - [3, 6, 12, 1590738989000] sql: | - select ~c1 as r1, ~c2 as r2, ~c3 as r3 from {0} + select ~c1 as r1, ~c2 as r2, ~c3 as r3 from {0}; expect: columns: [ 'r1 int16', 'r2 int32', 'r3 bigint'] rows: - [ -4, -7, -13 ] - id: bitwise_not_fail desc: bitwise not, fail on non-integral operand - mode: hybridse-only inputs: - columns: [ "c0 int", "c1 bool", "c2 float", "c3 double", "c4 string", "c5 date", "c6 timestamp" ] indexs: ["index1:c0:c6"] rows: - [1, true, 1.0, 2.0, "abc", "2012-8-11", 1590738989000] sql: | - select d[0] d[1] as r1 from {0} + select d[0] d[1] as r1 from {0}; dataProvider: - ['~'] - [ '{0}.c1', '{0}.c2', '{0}.c3', '{0}.c4', '{0}.c5', '{0}.c6' ] @@ -663,28 +659,26 @@ cases: success: false - id: bitwise_null_operands desc: bitwise operation return null if any of operands is null - mode: hybridse-only inputs: - columns: ["c1 int16","c2 int32","c3 bigint", "c4 int16", "c6 timestamp"] indexs: ["index1:c3:c6"] rows: - [3, 6, 12, NULL, 1590738989000] sql: | - select {0}.c1 & {0}.c4 as r1, {0}.c2 | {0}.c4 as r2, {0}.c3 ^ {0}.c4 as r3, ~ {0}.c4 as r4 from {0} + select {0}.c1 & {0}.c4 as r1, {0}.c2 | {0}.c4 as r2, {0}.c3 ^ {0}.c4 as r3, ~ {0}.c4 as r4 from {0}; expect: columns: [ 'r1 int16', 'r2 int32', 'r3 int64', 'r4 int16' ] rows: - [ NULL, NULL, NULL, NULL ] - id: bitwise_const_null_operands desc: bitwise operation return null if any of operands is null - mode: hybridse-only inputs: - columns: ["c1 int16","c2 int32","c3 bigint", "c4 int", "c6 timestamp"] indexs: ["index1:c3:c6"] rows: - [3, 6, 12, NULL, 1590738989000] sql: | - select {0}.c1 & NULL as r1, {0}.c2 | NULL as r2, {0}.c3 ^ NULL as r3, ~ NULL as r4 from {0} + select {0}.c1 & NULL as r1, {0}.c2 | NULL as r2, {0}.c3 ^ NULL as r3, ~ NULL as r4 from {0}; expect: columns: [ 'r1 int16', 'r2 int32', 'r3 int64', 'r4 bool' ] rows: diff --git a/cases/function/expression/test_condition.yaml b/cases/function/expression/test_condition.yaml index db2a0b8743e..cdcd077bacc 100644 --- a/cases/function/expression/test_condition.yaml +++ b/cases/function/expression/test_condition.yaml @@ -286,7 +286,6 @@ cases: success: false - id: 11-2 desc: NVL is synonyms to ifnull - mode: hybridse-only inputs: - columns: ["col1 int","col2 string", "col4 timestamp"] indexs: ["index1:col1:col4"] @@ -305,7 +304,6 @@ cases: - [3, ""] - id: 11-3 desc: NVL-表达式-/0 - mode: hybridse-only inputs: - columns: ["col1 int","col2 int", "col4 timestamp"] indexs: ["index1:col1:col4"] @@ -348,7 +346,6 @@ cases: - id: NVL2-1 desc: NVL2 - mode: hybridse-only inputs: - columns: ["col1 int","col2 int", "col4 timestamp"] indexs: ["index1:col1:col4"] @@ -368,7 +365,6 @@ cases: - id: NVL2-2 desc: NVL2, type not match - mode: hybridse-only inputs: - columns: ["col1 int","col2 int", "col4 timestamp"] indexs: ["index1:col1:col4"] @@ -383,7 +379,6 @@ cases: - id: NVL2-3 desc: NVL2, sub expression - mode: hybridse-only inputs: - columns: ["col1 int","col2 int", "col4 timestamp"] indexs: ["index1:col1:col4"] diff --git a/cases/query/parameterized_query.yaml b/cases/query/parameterized_query.yaml index 34f412a9943..84a7d7b0042 100644 --- a/cases/query/parameterized_query.yaml +++ b/cases/query/parameterized_query.yaml @@ -42,7 +42,7 @@ cases: data: | 0, 1, 5, 1.1, 11.1, 1, 1 0, 2, 5, 2.2, 22.2, 2, 22 - - id: 1 + - id: 1-1 desc: 带参数的Where条件部分命中索引 mode: request-unsupport, offline-unsupport db: db1 @@ -71,6 +71,37 @@ cases: data: | 1, 4, 55, 4.4, 44.4, 2, 4444 2, 5, 55, 5.5, 55.5, 3, aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa + - id: 1-2 + desc: 带参数的Where条件部分命中索引,参数包含字符串 + mode: request-unsupport, offline-unsupport + db: db1 + sql: | + SELECT col0, col1, col2, col3, col4, col5, col6 FROM {0} where col6=? and col1 < ?; + parameters: + columns: [ "p1 string" , "p2 bigint" ] + rows: + - [ aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa, 7 ] + inputs: + - schema: col0:string, col1:int64, col2:int16, col3:float, col4:double, col5:int64, col6:string + index: index1:col6:col5 + data: | + 0, 1, 5, 1.1, 11.1, 1, 1 + 0, 2, 5, 2.2, 22.2, 2, 22 + 1, 3, 55, 3.3, 33.3, 1, 333 + 1, 4, 55, 4.4, 44.4, 2, 4444 + 2, 5, 55, 5.5, 55.5, 3, aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa + 2, 6, 55, 6.6, 66.6, 4, aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa + 2, 7, 55, 7.7, 77.7, 5, aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa + batch_plan: | + SIMPLE_PROJECT(sources=(col0, col1, col2, col3, col4, col5, col6)) + FILTER_BY(condition=col1 < ?2, left_keys=(), right_keys=(), index_keys=(?1)) + DATA_PROVIDER(type=Partition, table=auto_t0, index=index1) + expect: + schema: col0:string, col1:int64, col2:int16, col3:float, col4:double, col5:int64, col6:string + order: col1 + data: | + 2, 5, 55, 5.5, 55.5, 3, aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa + 2, 6, 55, 6.6, 66.6, 4, aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa - id: 2-1 desc: 带参数的Where条件未命中索引 mode: request-unsupport, offline-unsupport diff --git a/hybridse/CHANGELOG.md b/hybridse/CHANGELOG.md index 35bd214a056..47637853e11 100644 --- a/hybridse/CHANGELOG.md +++ b/hybridse/CHANGELOG.md @@ -1,18 +1,21 @@ # Changelog ## [Unreleased] - +### Feature +- Support parameterized query under BatchMode [#262](https://github.com/4paradigm/OpenMLDB/issues/262) ### SQL Syntax -- Support parameterized query [#170](https://github.com/4paradigm/HybridSE/issues/170) -- `nvl` & `nvl2`: [#190](https://github.com/4paradigm/HybridSE/pull/190) -- bitwise operators: [#244](https://github.com/4paradigm/OpenMLDB/pull/244) +- `nvl` & `nvl2`: [#238](https://github.com/4paradigm/OpenMLDB/issues/238) +- bitwise operators: `&`, `|`, `^`, `~` [#244](https://github.com/4paradigm/OpenMLDB/pull/244) -## [0.2.1] - 2021-08-08 +## [0.2.1] - 2021-08-06 ### Feature -+ Add `VARCHAR` Type [#196](https://github.com/4paradigm/HybridSE/issues/196) ++ Add `VARCHAR` Type [#237](https://github.com/4paradigm/OpenMLDB/issues/237) ### Bug Fix -- Fix invalid back qoute identifier name [#192](https://github.com/4paradigm/HybridSE/issues/192). Note: option key can't write as multiple path style (e.g a.b) now +- Fix invalid back qoute identifier name [#263](https://github.com/4paradigm/OpenMLDB/issues/263). + +### Note: +`OPTIONS` can't write as multiple path style (e.g a.b) now ## [0.2.0] - 2021-07-16 ### SQL Syntax diff --git a/hybridse/CPPLINT.cfg b/hybridse/CPPLINT.cfg new file mode 100644 index 00000000000..41435d96696 --- /dev/null +++ b/hybridse/CPPLINT.cfg @@ -0,0 +1,2 @@ +linelength=120 +filter=-build/header_guard diff --git a/hybridse/examples/toydb/src/bm/engine_bm_case.cc b/hybridse/examples/toydb/src/bm/engine_bm_case.cc index 452e3ec39d6..153f4db11ad 100644 --- a/hybridse/examples/toydb/src/bm/engine_bm_case.cc +++ b/hybridse/examples/toydb/src/bm/engine_bm_case.cc @@ -163,19 +163,17 @@ static void EngineBatchMode(const std::string sql, MODE mode, int64_t limit_cnt, switch (mode) { case BENCHMARK: { for (auto _ : *state) { - benchmark::DoNotOptimize( - static_cast< - const std::shared_ptr>( - session.Run())); + std::vector outputs; + benchmark::DoNotOptimize(session.Run(outputs)); } break; } case TEST: { - auto res = session.Run(); - if (!res) { + std::vector outputs; + if (0 != session.Run(outputs)) { FAIL(); } - ASSERT_EQ(static_cast(limit_cnt), res->GetCount()); + ASSERT_EQ(static_cast(limit_cnt), outputs.size()); break; } } @@ -735,20 +733,22 @@ void EngineBatchModeSimpleQueryBM(const std::string& db, const std::string& sql, case BENCHMARK: { for (auto _ : *state) { // use const value to avoid compiler bug for some version + vector outputs; benchmark::DoNotOptimize( static_cast< const std::shared_ptr>( - session.Run())); + session.Run(outputs))); } break; } case TEST: { session.EnableDebug(); - auto res = session.Run(); - if (!res) { + vector outputs; + + if (0 != session.Run(outputs)) { FAIL(); } - ASSERT_GT(res->GetCount(), 0u); + ASSERT_GT(outputs.size(), 0u); break; } } diff --git a/hybridse/examples/toydb/src/tablet/tablet_server_impl.cc b/hybridse/examples/toydb/src/tablet/tablet_server_impl.cc index 639275e8a85..6156959a305 100644 --- a/hybridse/examples/toydb/src/tablet/tablet_server_impl.cc +++ b/hybridse/examples/toydb/src/tablet/tablet_server_impl.cc @@ -173,22 +173,20 @@ void TabletServerImpl::Query(RpcController* ctrl, const QueryRequest* request, session.EnableDebug(); } codec::Row parameter(request->parameter_row()); - auto table = session.Run(parameter); + std::vector outputs; + int32_t ret = session.Run(parameter, outputs); - if (!table) { + if (0 != ret) { LOG(WARNING) << "fail to run sql " << request->sql(); status->set_code(common::kSqlError); status->set_msg("fail to run sql"); return; } - auto iter = table->GetIterator(); uint32_t byte_size = 0; uint32_t count = 0; - while (iter->Valid()) { - const codec::Row& row = iter->GetValue(); + for(auto& row: outputs) { byte_size += row.size(); - iter->Next(); buf.append(reinterpret_cast(row.buf()), row.size()); count += 1; } @@ -203,7 +201,6 @@ void TabletServerImpl::Query(RpcController* ctrl, const QueryRequest* request, return; } vm::RequestRunSession session; - session.SetParameterSchema(request->parameter_schema()); { base::Status base_status; bool ok = engine_->Get(request->sql(), request->db(), session, @@ -218,10 +215,9 @@ void TabletServerImpl::Query(RpcController* ctrl, const QueryRequest* request, if (request->is_debug()) { session.EnableDebug(); } - codec::Row parameter(request->parameter_row()); codec::Row row(request->row()); codec::Row output; - int32_t ret = session.Run(request->task_id(), row, parameter, &output); + int32_t ret = session.Run(request->task_id(), row, &output); if (ret != 0) { LOG(WARNING) << "fail to run sql " << request->sql(); status->set_code(common::kSqlError); diff --git a/hybridse/include/base/fe_status.h b/hybridse/include/base/fe_status.h index 663326b1b58..ed5c03dae80 100644 --- a/hybridse/include/base/fe_status.h +++ b/hybridse/include/base/fe_status.h @@ -27,10 +27,14 @@ namespace base { template static inline std::initializer_list __output_literal_args(STREAM& stream, // NOLINT Args... args) { // NOLINT +#ifdef __APPLE__ #pragma GCC diagnostic push #pragma GCC diagnostic ignored "-Wreturn-stack-address" +#endif return std::initializer_list{(stream << args, 0)...}; +#ifdef __APPLE__ #pragma GCC diagnostic pop +#endif } #define MAX_STATUS_TRACE_SIZE 4096 diff --git a/hybridse/include/node/sql_node.h b/hybridse/include/node/sql_node.h index 0509f5e30f7..b4bed86967a 100644 --- a/hybridse/include/node/sql_node.h +++ b/hybridse/include/node/sql_node.h @@ -560,7 +560,7 @@ class OrderByNode : public ExprNode { virtual bool Equals(const ExprNode *that) const; OrderByNode *ShadowCopy(NodeManager *) const override; const ExprListNode *order_expressions() const { return order_expressions_; } - const OrderExpression *GetOrderExpression(int idx) const { + const OrderExpression *GetOrderExpression(size_t idx) const { if (nullptr == order_expressions_) { return nullptr; } diff --git a/hybridse/include/vm/engine.h b/hybridse/include/vm/engine.h index a519f0750db..44110d5a16e 100644 --- a/hybridse/include/vm/engine.h +++ b/hybridse/include/vm/engine.h @@ -155,11 +155,6 @@ class RunSession { return compile_info_->GetSchema(); } - /// Return query parameter schema. - virtual const Schema& GetParameterSchema() const { - return compile_info_->GetParameterSchema(); - } - /// Return query schema string. virtual const std::string& GetEncodedSchema() const { return compile_info_->GetEncodedSchema(); @@ -183,15 +178,12 @@ class RunSession { /// Bind this run session with specific procedure void SetSpName(const std::string& sp_name) { sp_name_ = sp_name; } - /// Bing the run session with specific parameter schema - void SetParameterSchema(const codec::Schema& schema) { parameter_schema_ = schema; } /// Return the engine mode of this run session EngineMode engine_mode() const { return engine_mode_; } protected: std::shared_ptr compile_info_; hybridse::vm::EngineMode engine_mode_; - codec::Schema parameter_schema_; bool is_debug_; std::string sp_name_; friend Engine; @@ -201,18 +193,23 @@ class RunSession { class BatchRunSession : public RunSession { public: explicit BatchRunSession(bool mini_batch = false) - : RunSession(kBatchMode), mini_batch_(mini_batch) {} + : RunSession(kBatchMode), parameter_schema_() {} ~BatchRunSession() {} /// \brief Query sql with parameter row in batch mode. /// Query results will be returned as std::vector in output int32_t Run(const Row& parameter_row, std::vector& output, // NOLINT uint64_t limit = 0); - /// \brief Query sql in batch mode. - /// Return query result as TableHandler pointer. - std::shared_ptr Run(const Row& parameter_row); + /// \brief Query sql in batch mode. + /// Query results will be returned as std::vector in output + int32_t Run(std::vector& output, // NOLINT + uint64_t limit = 0); + /// Bing the run session with specific parameter schema + void SetParameterSchema(const codec::Schema& schema) { parameter_schema_ = schema; } + /// Return query parameter schema. + virtual const Schema& GetParameterSchema() const { return parameter_schema_; } private: - const bool mini_batch_; + codec::Schema parameter_schema_; }; /// \brief RequestRunSession is a kind of RunSession designed for request mode query. /// @@ -226,8 +223,7 @@ class RequestRunSession : public RunSession { /// \param in_row request row /// \param output query result will be returned as Row in output /// \return `0` if run successfully else negative integer - int32_t Run(const Row& in_row, const Row& parameter_row, - Row* output); // NOLINT + int32_t Run(const Row& in_row, Row* output); // NOLINT /// \brief Run a task specified by task_id in request mode. /// @@ -235,8 +231,7 @@ class RequestRunSession : public RunSession { /// \param in_row: request row /// \param[out] output: result is written to this variable /// \return `0` if run successfully else negative integer - int32_t Run(uint32_t task_id, const Row& in_row, const Row& parameter_row, - Row* output); // NOLINT + int32_t Run(uint32_t task_id, const Row& in_row, Row* output); // NOLINT /// \brief Return the schema of request row virtual const Schema& GetRequestSchema() const { @@ -269,16 +264,14 @@ class BatchRequestRunSession : public RunSession { /// \param request_batch: a batch of request rows /// \param output: query results will be returned as std::vector in output /// \return 0 if runs successfully else negative integer - int32_t Run(const std::vector& request_batch, const Row& parameter_row, - std::vector& output); // NOLINT + int32_t Run(const std::vector& request_batch, std::vector& output); // NOLINT /// \brief Run a task specified by task_id in request mode. /// \param id: id of task /// \param request_batch: a batch of request rows /// \param output: query results will be returned as std::vector in output /// \return 0 if runs successfully else negative integer - int32_t Run(const uint32_t id, const std::vector& request_batch, const Row& parameter_row, - std::vector& output); // NOLINT + int32_t Run(const uint32_t id, const std::vector& request_batch, std::vector& output); // NOLINT /// \brief Add common column idx void AddCommonColumnIdx(size_t idx) { common_column_indices_.insert(idx); } @@ -352,6 +345,13 @@ class Engine { /// The results are returned as ExplainOutput in explain_output. /// The success or fail status message is returned as Status in status. /// TODO: base::Status* status -> base::Status& status + bool Explain(const std::string& sql, const std::string& db, + EngineMode engine_mode, ExplainOutput* explain_output, base::Status* status); + /// \brief Explain sql compiling result. + /// + /// The results are returned as ExplainOutput in explain_output. + /// The success or fail status message is returned as Status in status. + /// TODO: base::Status* status -> base::Status& status bool Explain(const std::string& sql, const std::string& db, EngineMode engine_mode, const codec::Schema& parameter_schema, ExplainOutput* explain_output, @@ -360,10 +360,8 @@ class Engine { /// \brief Same as above, but allowing compiling with configuring common column indices. /// /// The common column indices are used for common column optimization under EngineMode::kBatchRequestMode - bool Explain(const std::string& sql, const std::string& db, - EngineMode engine_mode, const codec::Schema& parameter_schema, - const std::set& common_column_indices, - ExplainOutput* explain_output, base::Status* status); + bool Explain(const std::string& sql, const std::string& db, EngineMode engine_mode, + const std::set& common_column_indices, ExplainOutput* explain_output, base::Status* status); /// \brief Update engine's catalog inline void UpdateCatalog(std::shared_ptr cl) { @@ -387,6 +385,10 @@ class Engine { std::shared_ptr info, base::Status& status); // NOLINT + bool Explain(const std::string& sql, const std::string& db, + EngineMode engine_mode, const codec::Schema& parameter_schema, + const std::set& common_column_indices, + ExplainOutput* explain_output, base::Status* status); std::shared_ptr cl_; EngineOptions options_; base::SpinMutex mu_; diff --git a/hybridse/src/cmd/simple_engine_demo.cc b/hybridse/src/cmd/simple_engine_demo.cc index ca879bb6ef1..5054c1cd4b3 100644 --- a/hybridse/src/cmd/simple_engine_demo.cc +++ b/hybridse/src/cmd/simple_engine_demo.cc @@ -162,10 +162,9 @@ int run() { get_status.code != common::kOk) { return SIMPLE_ENGINE_COMPILE_ERROR; } - Row empty_parameter_row; std::vector outputs; // run sql query - if (0 != session.Run(empty_parameter_row, outputs)) { + if (0 != session.Run(outputs)) { return SIMPLE_ENGINE_RUN_ERROR; } PrintRows(session.GetSchema(), outputs); diff --git a/hybridse/src/proto/fe_common.proto b/hybridse/src/proto/fe_common.proto index 535954c7938..bb878a4195f 100644 --- a/hybridse/src/proto/fe_common.proto +++ b/hybridse/src/proto/fe_common.proto @@ -22,6 +22,7 @@ option java_outer_classname = "Common"; enum StatusCode { kOk = 0; kRunning = 1; + kRunError = 2; kTypeError = 8; kFileIOError = 9; kUnSupport = 10; diff --git a/hybridse/src/testing/engine_test_base.cc b/hybridse/src/testing/engine_test_base.cc index 7d19b167c85..bde3d79b130 100644 --- a/hybridse/src/testing/engine_test_base.cc +++ b/hybridse/src/testing/engine_test_base.cc @@ -379,7 +379,14 @@ Status EngineTestRunner::Compile() { if (hybridse::sqlcase::SqlCase::IsDebug() || sql_case_.debug()) { session_->EnableDebug(); } - session_->SetParameterSchema(parameter_schema_); + if (session_->engine_mode() == kBatchMode) { + auto batch_session = + std::dynamic_pointer_cast(session_); + batch_session->SetParameterSchema(parameter_schema_); + } else { + CHECK_TRUE(parameter_schema_.empty(), common::kUnSupport, + "Request or BatchRequest mode do not support parameterized query currently") + } struct timeval st; struct timeval et; gettimeofday(&st, nullptr); diff --git a/hybridse/src/testing/engine_test_base.h b/hybridse/src/testing/engine_test_base.h index 66e0cb1478e..25d9d9f9c00 100644 --- a/hybridse/src/testing/engine_test_base.h +++ b/hybridse/src/testing/engine_test_base.h @@ -262,13 +262,14 @@ class RequestEngineTestRunner : public EngineTestRunner { auto request_session = std::dynamic_pointer_cast(session_); std::string request_name = request_session->GetRequestName(); + CHECK_TRUE(parameter_rows_.empty(), common::kUnSupport, "Request do not support parameterized query currently") Row parameter = parameter_rows_.empty() ? Row() : parameter_rows_[0]; for (auto in_row : request_rows_) { Row out_row; - int run_ret = request_session->Run(in_row, parameter, &out_row); + int run_ret = request_session->Run(in_row, &out_row); if (run_ret != 0) { return_code_ = ENGINE_TEST_RET_EXECUTION_ERROR; - return Status(kSqlError, "Run request session failed"); + return Status(common::kRunError, "Run request session failed"); } if (!has_batch_request) { CHECK_TRUE(AddRowIntoTable(request_name, in_row), kSqlError, @@ -417,8 +418,10 @@ class BatchRequestEngineTestRunner : public EngineTestRunner { auto request_session = std::dynamic_pointer_cast(session_); CHECK_TRUE(request_session != nullptr, common::kSqlError); - Row parameter = parameter_rows_.empty() ? Row() : parameter_rows_[0]; - int run_ret = request_session->Run(request_rows_, parameter, *outputs); + // Currently parameterized query un-support currently + CHECK_TRUE(parameter_rows_.empty(), common::kUnSupport, + "Batch request do not support parameterized query currently") + int run_ret = request_session->Run(request_rows_, *outputs); if (run_ret != 0) { return_code_ = ENGINE_TEST_RET_EXECUTION_ERROR; return Status(kSqlError, "Run batch request session failed"); diff --git a/hybridse/src/vm/engine.cc b/hybridse/src/vm/engine.cc index 77b8882bc12..e7e5abf9b02 100644 --- a/hybridse/src/vm/engine.cc +++ b/hybridse/src/vm/engine.cc @@ -148,19 +148,22 @@ bool Engine::IsCompatibleCache(RunSession& session, // NOLINT return false; } auto& cache_ctx = std::dynamic_pointer_cast(info)->get_sql_context(); - if (cache_ctx.parameter_types.size() != session.parameter_schema_.size()) { - status = Status(common::kSqlError, "Inconsistent cache parameter schema size"); - return false; - } - for (int i = 0; i < session.parameter_schema_.size(); i++) { - if (cache_ctx.parameter_types.Get(i).type() != session.GetParameterSchema().Get(i).type()) { - status = Status(common::kSqlError, "Inconsistent cache parameter type, expect " + - session.GetParameterSchema().Get(i).DebugString() + " but get " + - cache_ctx.parameter_types.Get(i).DebugString()); + + if (session.engine_mode() == kBatchMode) { + auto batch_sess = dynamic_cast(&session); + if (cache_ctx.parameter_types.size() != batch_sess->GetParameterSchema().size()) { + status = Status(common::kSqlError, "Inconsistent cache parameter schema size"); return false; } - } - if (session.engine_mode() == kBatchRequestMode) { + for (int i = 0; i < batch_sess->GetParameterSchema().size(); i++) { + if (cache_ctx.parameter_types.Get(i).type() != batch_sess->GetParameterSchema().Get(i).type()) { + status = Status(common::kSqlError, "Inconsistent cache parameter type, expect " + + batch_sess->GetParameterSchema().Get(i).DebugString() + + " but get " + cache_ctx.parameter_types.Get(i).DebugString()); + return false; + } + } + } else if (session.engine_mode() == kBatchRequestMode) { auto batch_req_sess = dynamic_cast(&session); if (batch_req_sess == nullptr) { return false; @@ -200,10 +203,10 @@ bool Engine::Get(const std::string& sql, const std::string& db, RunSession& sess sql_context.enable_batch_window_parallelization = options_.is_enable_batch_window_parallelization(); sql_context.enable_expr_optimize = options_.is_enable_expr_optimize(); sql_context.jit_options = options_.jit_options(); - sql_context.parameter_types = session.parameter_schema_; - - auto batch_req_sess = dynamic_cast(&session); - if (batch_req_sess) { + if (session.engine_mode() == kBatchMode) { + sql_context.parameter_types = dynamic_cast(&session)->GetParameterSchema(); + } else if (session.engine_mode() == kBatchRequestMode) { + auto batch_req_sess = dynamic_cast(&session); sql_context.batch_request_info.common_column_indices = batch_req_sess->common_column_indices(); } @@ -248,6 +251,10 @@ bool Engine::Explain(const std::string& sql, const std::string& db, EngineMode e LOG(WARNING) << "common column config can only be valid in batch request mode"; return false; } + if (!parameter_schema.empty() && engine_mode != kBatchMode) { + LOG(WARNING) << "parameterized query can only be valid in batch mode"; + return false; + } SqlContext ctx; ctx.engine_mode = engine_mode; ctx.sql = sql; @@ -300,11 +307,22 @@ bool Engine::Explain(const std::string& sql, const std::string& db, EngineMode e } return true; } +bool Engine::Explain(const std::string& sql, const std::string& db, EngineMode engine_mode, + ExplainOutput* explain_output, base::Status* status) { + const codec::Schema empty_schema; + return Explain(sql, db, engine_mode, empty_schema, {}, explain_output, status); +} bool Engine::Explain(const std::string& sql, const std::string& db, EngineMode engine_mode, const codec::Schema& parameter_schema, ExplainOutput* explain_output, base::Status* status) { return Explain(sql, db, engine_mode, parameter_schema, {}, explain_output, status); } +bool Engine::Explain(const std::string& sql, const std::string& db, EngineMode engine_mode, + const std::set& common_column_indices, + ExplainOutput* explain_output, base::Status* status) { + const codec::Schema empty_schema; + return Explain(sql, db, engine_mode, empty_schema, common_column_indices, explain_output, status); +} void Engine::ClearCacheLocked(const std::string& db) { std::lock_guard lock(mu_); @@ -363,12 +381,12 @@ bool RunSession::SetCompileInfo(const std::shared_ptr& compile_info return true; } -int32_t RequestRunSession::Run(const Row& in_row, const Row& parameter_row, Row* out_row) { +int32_t RequestRunSession::Run(const Row& in_row, Row* out_row) { DLOG(INFO) << "Request Row Run with main task"; return Run(std::dynamic_pointer_cast(compile_info_)->get_sql_context().cluster_job.main_task_id(), - in_row, parameter_row, out_row); + in_row, out_row); } -int32_t RequestRunSession::Run(const uint32_t task_id, const Row& in_row, const Row& parameter_row, Row* out_row) { +int32_t RequestRunSession::Run(const uint32_t task_id, const Row& in_row, Row* out_row) { auto task = std::dynamic_pointer_cast(compile_info_) ->get_sql_context() .cluster_job.GetTask(task_id) @@ -379,7 +397,7 @@ int32_t RequestRunSession::Run(const uint32_t task_id, const Row& in_row, const } DLOG(INFO) << "Request Row Run with task_id " << task_id; RunnerContext ctx(&std::dynamic_pointer_cast(compile_info_)->get_sql_context().cluster_job, in_row, - parameter_row, sp_name_, is_debug_); + sp_name_, is_debug_); auto output = task->RunWithCache(ctx); if (!output) { LOG(WARNING) << "run request plan output is null"; @@ -392,15 +410,14 @@ int32_t RequestRunSession::Run(const uint32_t task_id, const Row& in_row, const return -1; } -int32_t BatchRequestRunSession::Run(const std::vector& request_batch, const Row& parameter_row, - std::vector& output) { +int32_t BatchRequestRunSession::Run(const std::vector& request_batch, std::vector& output) { return Run(std::dynamic_pointer_cast(compile_info_)->get_sql_context().cluster_job.main_task_id(), - request_batch, parameter_row, output); + request_batch, output); } -int32_t BatchRequestRunSession::Run(const uint32_t id, const std::vector& request_batch, const Row& parameter_row, +int32_t BatchRequestRunSession::Run(const uint32_t id, const std::vector& request_batch, std::vector& output) { RunnerContext ctx(&std::dynamic_pointer_cast(compile_info_)->get_sql_context().cluster_job, - request_batch, parameter_row, sp_name_, is_debug_); + request_batch, sp_name_, is_debug_); auto task = std::dynamic_pointer_cast(compile_info_)->get_sql_context().cluster_job.GetTask(id).GetRoot(); if (nullptr == task) { @@ -419,34 +436,8 @@ int32_t BatchRequestRunSession::Run(const uint32_t id, const std::vector& r ctx.ClearCache(); return 0; } - -std::shared_ptr BatchRunSession::Run(const Row& parameter_row) { - RunnerContext ctx(&std::dynamic_pointer_cast(compile_info_)->get_sql_context().cluster_job, - parameter_row, is_debug_); - auto output = std::dynamic_pointer_cast(compile_info_) - ->get_sql_context() - .cluster_job.GetMainTask() - .GetRoot() - ->RunWithCache(ctx); - if (!output) { - LOG(WARNING) << "run batch plan output is null"; - return std::shared_ptr(); - } - switch (output->GetHanlderType()) { - case kTableHandler: { - return std::dynamic_pointer_cast(output); - } - case kRowHandler: { - auto table = std::shared_ptr(new MemTableHandler()); - table->AddRow(std::dynamic_pointer_cast(output)->GetValue()); - return table; - } - case kPartitionHandler: { - LOG(WARNING) << "partition output is invalid"; - return std::shared_ptr(); - } - } - return std::shared_ptr(); +int32_t BatchRunSession::Run(std::vector& rows, uint64_t limit) { + return Run(Row(), rows, limit); } int32_t BatchRunSession::Run(const Row& parameter_row, std::vector& rows, uint64_t limit) { auto& sql_ctx = std::dynamic_pointer_cast(compile_info_)->get_sql_context(); diff --git a/hybridse/src/vm/engine_compile_test.cc b/hybridse/src/vm/engine_compile_test.cc index 1578ee33aa7..d1fca9efb0c 100644 --- a/hybridse/src/vm/engine_compile_test.cc +++ b/hybridse/src/vm/engine_compile_test.cc @@ -67,21 +67,94 @@ TEST_F(EngineCompileTest, EngineLRUCacheTest) { { base::Status get_status; BatchRunSession bsession1; - ASSERT_TRUE(engine.Get(sql, "simple_db", bsession1, get_status)) - << get_status; + ASSERT_TRUE(engine.Get(sql, "simple_db", bsession1, get_status)) << get_status; ASSERT_EQ(get_status.code, common::kOk); BatchRunSession bsession2; ASSERT_TRUE(engine.Get(sql, "simple_db", bsession2, get_status)); ASSERT_EQ(get_status.code, common::kOk); - ASSERT_EQ(bsession1.GetCompileInfo().get(), - bsession2.GetCompileInfo().get()); + ASSERT_EQ(bsession1.GetCompileInfo().get(), bsession2.GetCompileInfo().get()); BatchRunSession bsession3; ASSERT_TRUE(engine.Get(sql2, "simple_db", bsession3, get_status)); ASSERT_EQ(get_status.code, common::kOk); ASSERT_TRUE(engine.Get(sql, "simple_db", bsession2, get_status)); ASSERT_EQ(get_status.code, common::kOk); - ASSERT_NE(bsession1.GetCompileInfo().get(), - bsession2.GetCompileInfo().get()); + ASSERT_NE(bsession1.GetCompileInfo().get(), bsession2.GetCompileInfo().get()); + } +} + +TEST_F(EngineCompileTest, EngineWithParameterizedLRUCacheTest) { + // Build Simple Catalog + auto catalog = BuildSimpleCatalog(); + + // database simple_db + hybridse::type::Database db; + db.set_name("simple_db"); + + // table t1 + hybridse::type::TableDef table_def; + sqlcase::CaseSchemaMock::BuildTableDef(table_def); + table_def.set_name("t1"); + ::hybridse::type::IndexDef* index = table_def.add_indexes(); + index->set_name("index0"); + index->add_first_keys("col0"); + index->set_second_key("col5"); + AddTable(db, table_def); + + catalog->AddDatabase(db); + + // Simple Engine + EngineOptions options; + options.set_compile_only(true); + options.set_max_sql_cache_size(1); + Engine engine(catalog, options); + + hybridse::codec::Schema parameter_schema; + { + auto column = parameter_schema.Add(); + column->set_type(hybridse::type::kVarchar); + } + { + auto column = parameter_schema.Add(); + column->set_type(hybridse::type::kInt64); + } + + hybridse::codec::Schema parameter_schema2; + { + auto column = parameter_schema2.Add(); + column->set_type(hybridse::type::kInt64); + } + { + auto column = parameter_schema2.Add(); + column->set_type(hybridse::type::kInt64); + } + std::string sql = "select col1, col2 from t1 where col0=? and col5AddDatabase(db); - codec::Schema empty_parameter_schema; std::set common_column_indices({2, 3, 5}); std::string sql = "select col0, col1, col2, sum(col1) over w1, \n" @@ -323,7 +395,6 @@ TEST_F(EngineCompileTest, ExplainBatchRequestTest) { ExplainOutput explain_output; base::Status status; ASSERT_TRUE(engine.Explain(sql, "simple_db", kBatchRequestMode, - empty_parameter_schema, common_column_indices, &explain_output, &status)); ASSERT_TRUE(status.isOK()) << status; diff --git a/hybridse/src/vm/local_tablet_handler.h b/hybridse/src/vm/local_tablet_handler.h index e96942f600a..b43f95d6b46 100644 --- a/hybridse/src/vm/local_tablet_handler.h +++ b/hybridse/src/vm/local_tablet_handler.h @@ -47,7 +47,7 @@ class LocalTabletRowHandler : public RowHandler { base::Status SyncValue() { DLOG(INFO) << "Sync Value ... local tablet SubQuery request: task id " << task_id_; - if (0 != session_.Run(task_id_, request_, Row(), &value_)) { + if (0 != session_.Run(task_id_, request_, &value_)) { return base::Status(common::kCallMethodError, "sub query fail: session run fail"); } @@ -107,7 +107,7 @@ class LocalTabletTableHandler : public MemTableHandler { base::Status SyncValue() { DLOG(INFO) << "Local tablet SubQuery batch request: task id " << task_id_; - if (0 != session_.Run(task_id_, requests_, Row(), table_)) { + if (0 != session_.Run(task_id_, requests_, table_)) { return base::Status(common::kCallMethodError, "sub query fail: session run fail"); } diff --git a/hybridse/src/vm/runner.h b/hybridse/src/vm/runner.h index a6a533908ff..71ad2a2652b 100644 --- a/hybridse/src/vm/runner.h +++ b/hybridse/src/vm/runner.h @@ -1440,26 +1440,24 @@ class RunnerContext { batch_cache_() {} explicit RunnerContext(hybridse::vm::ClusterJob* cluster_job, const hybridse::codec::Row& request, - const hybridse::codec::Row& parameter, const std::string& sp_name = "", const bool is_debug = false) : cluster_job_(cluster_job), sp_name_(sp_name), request_(request), requests_(), - parameter_(parameter), + parameter_(), is_debug_(is_debug), batch_cache_() {} explicit RunnerContext(hybridse::vm::ClusterJob* cluster_job, const std::vector& request_batch, - const hybridse::codec::Row& parameter, const std::string& sp_name = "", const bool is_debug = false) : cluster_job_(cluster_job), sp_name_(sp_name), request_(), requests_(request_batch), - parameter_(parameter), + parameter_(), is_debug_(is_debug), batch_cache_() {} diff --git a/hybridse/tools/compile_and_coverage.sh b/hybridse/tools/compile_and_coverage.sh index 47ad900af44..433352a456b 100755 --- a/hybridse/tools/compile_and_coverage.sh +++ b/hybridse/tools/compile_and_coverage.sh @@ -24,8 +24,9 @@ popd # goto hybridse directory pushd "$(dirname "$0")/.." - -./tools/setup_thirdparty.sh +# install thirdparty hybrise +HYBRIDSE_THIRDPARTY="$(pwd)/thirdparty" +../steps/setup_thirdparty.sh ${HYBRIDSE_THIRDPARTY} if uname -a | grep -q Darwin; then # in case coreutils not install on mac diff --git a/hybridse/tools/hybridse_build.sh b/hybridse/tools/hybridse_build.sh index a60eea17ca0..4268fa8db5a 100755 --- a/hybridse/tools/hybridse_build.sh +++ b/hybridse/tools/hybridse_build.sh @@ -18,8 +18,9 @@ set -eE # goto toplevel directory cd "$(dirname "$0")/.." -./tools/setup_thirdparty.sh - +# install thirdparty hybrise +HYBRIDSE_THIRDPARTY="$(pwd)/thirdparty" +../steps/setup_thirdparty.sh ${HYBRIDSE_THIRDPARTY} if uname -a | grep -q Darwin; then # in case coreutils not install on mac diff --git a/hybridse/tools/hybridse_core_test.sh b/hybridse/tools/hybridse_core_test.sh index eb15cb383a2..a8e77c66cae 100755 --- a/hybridse/tools/hybridse_core_test.sh +++ b/hybridse/tools/hybridse_core_test.sh @@ -21,7 +21,9 @@ popd # goto hybridse directory pushd "$(dirname "$0")/.." -./tools/setup_thirdparty.sh +# install thirdparty hybrise +HYBRIDSE_THIRDPARTY="$(pwd)/thirdparty" +../steps/setup_thirdparty.sh ${HYBRIDSE_THIRDPARTY} if uname -a | grep -q Darwin; then # in case coreutils not install on mac @@ -32,8 +34,6 @@ mkdir -p build cd build cmake .. -DCMAKE_BUILD_TYPE=Release make -j"$(nproc)" -./src/base/fe_slice_test -./src/base/hash_test SQL_CASE_BASE_DIR=${OPENMLDB_DIR} make -j"$(nproc)" test popd diff --git a/hybridse/tools/hybridse_deploy.sh b/hybridse/tools/hybridse_deploy.sh index 1cfe9ce2697..744bb4dd30a 100755 --- a/hybridse/tools/hybridse_deploy.sh +++ b/hybridse/tools/hybridse_deploy.sh @@ -44,7 +44,7 @@ fi mkdir -p build pushd build/ -cmake .. -DCMAKE_BUILD_TYPE=Release -DCMAKE_INSTALL_PREFIX="hybridse" +cmake .. -DCMAKE_BUILD_TYPE=Release -DTESTING_ENABLE=OFF -DEXAMPLES_ENABLE=OFF -DCMAKE_INSTALL_PREFIX="hybridse" make -j "$(nproc)" install mv hybridse "$OUTPUT_DIR" tar czf "../$OUTPUT_ARCHIVE" "$OUTPUT_DIR" diff --git a/hybridse/tools/micro_bench.sh b/hybridse/tools/micro_bench.sh index c5c4b623605..2ee70ab84d8 100755 --- a/hybridse/tools/micro_bench.sh +++ b/hybridse/tools/micro_bench.sh @@ -24,8 +24,9 @@ popd # goto hybridse directory cd "$(dirname "$0")/.." - -./tools/setup_thirdparty.sh +# install thirdparty hybrise +HYBRIDSE_THIRDPARTY="$(pwd)/thirdparty" +../steps/setup_thirdparty.sh ${HYBRIDSE_THIRDPARTY} if uname -a | grep -q Darwin; then # in case coreutils not install on mac diff --git a/java/openmldb-jdbc/pom.xml b/java/openmldb-jdbc/pom.xml index 29fcbea2748..fc6c7b1266d 100644 --- a/java/openmldb-jdbc/pom.xml +++ b/java/openmldb-jdbc/pom.xml @@ -5,14 +5,14 @@ openmldb-parent com.4paradigm.openmldb - 0.2.0 + 0.2.1-SNAPSHOT ../pom.xml 4.0.0 openmldb-jdbc jar openmldb-jdbc - 0.2.0 + 0.2.1-SNAPSHOT UTF-8 @@ -38,7 +38,7 @@ com.4paradigm.openmldb openmldb-native - 0.2.0 + 0.2.1-SNAPSHOT org.apache.logging.log4j diff --git a/java/openmldb-jdbc/src/main/java/com/_4paradigm/openmldb/jdbc/RequestPreparedStatement.java b/java/openmldb-jdbc/src/main/java/com/_4paradigm/openmldb/jdbc/RequestPreparedStatement.java index a3d6bff9c6c..a70086ee178 100644 --- a/java/openmldb-jdbc/src/main/java/com/_4paradigm/openmldb/jdbc/RequestPreparedStatement.java +++ b/java/openmldb-jdbc/src/main/java/com/_4paradigm/openmldb/jdbc/RequestPreparedStatement.java @@ -96,7 +96,7 @@ public SQLResultSet executeQuery() throws SQLException { checkClosed(); dataBuild(); Status status = new Status(); - com._4paradigm.openmldb.ResultSet resultSet = router.ExecuteSQL(db, currentSql, currentRow, status); + com._4paradigm.openmldb.ResultSet resultSet = router.ExecuteSQLRequest(db, currentSql, currentRow, status); if (resultSet == null || status.getCode() != 0) { String msg = status.getMsg(); status.delete(); diff --git a/java/openmldb-jdbc/src/main/java/com/_4paradigm/openmldb/sdk/SqlExecutor.java b/java/openmldb-jdbc/src/main/java/com/_4paradigm/openmldb/sdk/SqlExecutor.java index ac06ea06ae3..e881e3bc987 100644 --- a/java/openmldb-jdbc/src/main/java/com/_4paradigm/openmldb/sdk/SqlExecutor.java +++ b/java/openmldb-jdbc/src/main/java/com/_4paradigm/openmldb/sdk/SqlExecutor.java @@ -41,7 +41,7 @@ public interface SqlExecutor { SQLInsertRows getInsertRows(String db, String sql); - ResultSet executeSQL(String db, String sql, SQLRequestRow row); + ResultSet executeSQLRequest(String db, String sql, SQLRequestRow row); PreparedStatement getInsertPreparedStmt(String db, String sql) throws SQLException; diff --git a/java/openmldb-jdbc/src/main/java/com/_4paradigm/openmldb/sdk/impl/SqlClusterExecutor.java b/java/openmldb-jdbc/src/main/java/com/_4paradigm/openmldb/sdk/impl/SqlClusterExecutor.java index 4095ec46c23..9971116fb7d 100644 --- a/java/openmldb-jdbc/src/main/java/com/_4paradigm/openmldb/sdk/impl/SqlClusterExecutor.java +++ b/java/openmldb-jdbc/src/main/java/com/_4paradigm/openmldb/sdk/impl/SqlClusterExecutor.java @@ -178,10 +178,10 @@ public SQLInsertRows getInsertRows(String db, String sql) { } @Override - public ResultSet executeSQL(String db, String sql, SQLRequestRow row) { + public ResultSet executeSQLRequest(String db, String sql, SQLRequestRow row) { //TODO(wangtaize) add execption Status status = new Status(); - ResultSet rs = sqlRouter.ExecuteSQL(db, sql, row, status); + ResultSet rs = sqlRouter.ExecuteSQLRequest(db, sql, row, status); if (status.getCode() != 0) { logger.error("getInsertRow fail: {}", status.getMsg()); } diff --git a/java/openmldb-native/pom.xml b/java/openmldb-native/pom.xml index 761135fd1ad..ea5733366d0 100644 --- a/java/openmldb-native/pom.xml +++ b/java/openmldb-native/pom.xml @@ -5,14 +5,14 @@ openmldb-parent com.4paradigm.openmldb - 0.2.0 + 0.2.1-SNAPSHOT ../pom.xml 4.0.0 openmldb-native jar openmldb-native - 0.2.0 + 0.2.1-SNAPSHOT UTF-8 diff --git a/java/pom.xml b/java/pom.xml index a32a16e8302..53d05f5592d 100644 --- a/java/pom.xml +++ b/java/pom.xml @@ -8,7 +8,7 @@ pom openmldb - 0.2.0 + 0.2.1-SNAPSHOT openmldb-jdbc openmldb-native diff --git a/python/openmldb/driver.py b/python/openmldb/driver.py index 97463b52282..47d02418c43 100644 --- a/python/openmldb/driver.py +++ b/python/openmldb/driver.py @@ -128,7 +128,7 @@ def executeQuery(self, db, sql, row_builder = None): status = sql_router_sdk.Status() if row_builder is not None: - rs = self.sdk.ExecuteSQL(db, sql, row_builder, status) + rs = self.sdk.ExecuteSQLRequest(db, sql, row_builder, status) else: rs = self.sdk.ExecuteSQL(db, sql, status) if status.code != 0: diff --git a/src/catalog/schema_adapter.h b/src/catalog/schema_adapter.h index c1d7121bcee..9d36f9f51f8 100644 --- a/src/catalog/schema_adapter.h +++ b/src/catalog/schema_adapter.h @@ -232,49 +232,171 @@ class SchemaAdapter { return true; } - static bool ConvertType(const hybridse::type::ColumnDef& sql_column, openmldb::common::ColumnDesc* fedb_column) { - if (fedb_column == nullptr) { - LOG(WARNING) << "fedb_column is null"; + static bool ConvertType(hybridse::type::Type hybridse_type, openmldb::type::DataType* oepnmldb_type) { + if (oepnmldb_type == nullptr) { return false; } - fedb_column->set_name(sql_column.name()); - fedb_column->set_not_null(sql_column.is_not_null()); - fedb_column->set_is_constant(sql_column.is_constant()); - switch (sql_column.type()) { + switch (hybridse_type) { case hybridse::type::kBool: - fedb_column->set_data_type(openmldb::type::kBool); + *oepnmldb_type = openmldb::type::kBool; break; case hybridse::type::kInt16: - fedb_column->set_data_type(openmldb::type::kSmallInt); + *oepnmldb_type = openmldb::type::kSmallInt; break; case hybridse::type::kInt32: - fedb_column->set_data_type(openmldb::type::kInt); + *oepnmldb_type = openmldb::type::kInt; break; case hybridse::type::kInt64: - fedb_column->set_data_type(openmldb::type::kBigInt); + *oepnmldb_type = openmldb::type::kBigInt; break; case hybridse::type::kFloat: - fedb_column->set_data_type(openmldb::type::kFloat); + *oepnmldb_type = openmldb::type::kFloat; break; case hybridse::type::kDouble: - fedb_column->set_data_type(openmldb::type::kDouble); + *oepnmldb_type = openmldb::type::kDouble; break; case hybridse::type::kDate: - fedb_column->set_data_type(openmldb::type::kDate); + *oepnmldb_type = openmldb::type::kDate; break; case hybridse::type::kTimestamp: - fedb_column->set_data_type(openmldb::type::kTimestamp); + *oepnmldb_type = openmldb::type::kTimestamp; break; case hybridse::type::kVarchar: - fedb_column->set_data_type(openmldb::type::kVarchar); + *oepnmldb_type = openmldb::type::kVarchar; break; default: - LOG(WARNING) << "type " << hybridse::type::Type_Name(sql_column.type()) << " is not supported"; + LOG(WARNING) << "unsupported type" << hybridse_type; return false; } return true; } + static bool ConvertType(openmldb::type::DataType oepnmldb_type, hybridse::type::Type* hybridse_type) { + if (hybridse_type == nullptr) { + return false; + } + switch (oepnmldb_type) { + case openmldb::type::kBool: + *hybridse_type = hybridse::type::kBool; + break; + case openmldb::type::kSmallInt: + *hybridse_type = hybridse::type::kInt16; + break; + case openmldb::type::kInt: + *hybridse_type = hybridse::type::kInt32; + break; + case openmldb::type::kBigInt: + *hybridse_type = hybridse::type::kInt64; + break; + case openmldb::type::kFloat: + *hybridse_type = hybridse::type::kFloat; + break; + case openmldb::type::kDouble: + *hybridse_type = hybridse::type::kDouble; + break; + case openmldb::type::kDate: + *hybridse_type = hybridse::type::kDate; + break; + case openmldb::type::kTimestamp: + *hybridse_type = hybridse::type::kTimestamp; + break; + case openmldb::type::kVarchar: + case openmldb::type::kString: + *hybridse_type = hybridse::type::kVarchar; + break; + default: + LOG(WARNING) << "unsupported type: " << openmldb::type::DataType_Name(oepnmldb_type); + return false; + } + return true; + } + + static bool ConvertType(hybridse::sdk::DataType type, hybridse::type::Type *cased_type) { + switch (type) { + case hybridse::sdk::DataType::kTypeBool: + *cased_type = hybridse::type::kBool; + return true; + case hybridse::sdk::DataType::kTypeInt16: + *cased_type = hybridse::type::kInt16; + return true; + case hybridse::sdk::DataType::kTypeInt32: + *cased_type = hybridse::type::kInt32; + return true; + case hybridse::sdk::DataType::kTypeInt64: + *cased_type = hybridse::type::kInt64; + return true; + case hybridse::sdk::DataType::kTypeFloat: + *cased_type = hybridse::type::kFloat; + return true; + case hybridse::sdk::DataType::kTypeDouble: + *cased_type = hybridse::type::kDouble; + return true; + case hybridse::sdk::DataType::kTypeDate: + *cased_type = hybridse::type::kDate; + return true; + case hybridse::sdk::DataType::kTypeTimestamp: + *cased_type = hybridse::type::kTimestamp; + return true; + case hybridse::sdk::DataType::kTypeString: + *cased_type = hybridse::type::kVarchar; + return true; + default: + return false; + } + return false; + } + static bool ConvertType(hybridse::sdk::DataType type, openmldb::type::DataType *cased_type) { + switch (type) { + case hybridse::sdk::DataType::kTypeBool: + *cased_type = openmldb::type::kBool; + return true; + case hybridse::sdk::DataType::kTypeInt16: + *cased_type = openmldb::type::kSmallInt; + return true; + case hybridse::sdk::DataType::kTypeInt32: + *cased_type = openmldb::type::kInt; + return true; + case hybridse::sdk::DataType::kTypeInt64: + *cased_type = openmldb::type::kBigInt; + return true; + case hybridse::sdk::DataType::kTypeFloat: + *cased_type = openmldb::type::kFloat; + return true; + case hybridse::sdk::DataType::kTypeDouble: + *cased_type = openmldb::type::kDouble; + return true; + case hybridse::sdk::DataType::kTypeDate: + *cased_type = openmldb::type::kDate; + return true; + case hybridse::sdk::DataType::kTypeTimestamp: + *cased_type = openmldb::type::kTimestamp; + return true; + case hybridse::sdk::DataType::kTypeString: + *cased_type = openmldb::type::kString; + return true; + default: + return false; + } + return false; + } + + static bool ConvertType(const hybridse::type::ColumnDef& sql_column, openmldb::common::ColumnDesc* fedb_column) { + if (fedb_column == nullptr) { + LOG(WARNING) << "fedb_column is null"; + return false; + } + fedb_column->set_name(sql_column.name()); + fedb_column->set_not_null(sql_column.is_not_null()); + fedb_column->set_is_constant(sql_column.is_constant()); + openmldb::type::DataType openmldb_type; + if (!ConvertType(sql_column.type(), &openmldb_type)) { + LOG(WARNING) << "type " << hybridse::type::Type_Name(sql_column.type()) << " is not supported"; + return false; + } + fedb_column->set_data_type(openmldb_type); + return true; + } + static std::shared_ptr ConvertProcedureInfo( const openmldb::api::ProcedureInfo& sp_info) { ::hybridse::vm::Schema hybridse_in_schema; diff --git a/src/catalog/tablet_catalog_test.cc b/src/catalog/tablet_catalog_test.cc index 8766f54eeb9..bbaf7c48147 100644 --- a/src/catalog/tablet_catalog_test.cc +++ b/src/catalog/tablet_catalog_test.cc @@ -213,16 +213,14 @@ TEST_F(TabletCatalogTest, sql_smoke_test) { std::cout << status.msg << std::endl; } ASSERT_EQ(::hybridse::common::kOk, status.code); - std::vector output; - std::shared_ptr<::hybridse::vm::TableHandler> result = session.Run(); - if (!result) { + std::vector outputs; + if (0 != session.Run(outputs)) { ASSERT_TRUE(false); } ::hybridse::codec::RowView rv(session.GetSchema()); ASSERT_EQ(2, session.GetSchema().size()); - auto it = result->GetIterator(); - ASSERT_TRUE(it->Valid()); - const ::hybridse::codec::Row &row = it->GetValue(); + ASSERT_EQ(1u, outputs.size()); + const ::hybridse::codec::Row &row = outputs[0]; rv.Reset(row.buf(), row.size()); int64_t val = 0; ASSERT_EQ(0, rv.GetInt64(1, &val)); @@ -263,15 +261,12 @@ TEST_F(TabletCatalogTest, sql_last_join_smoke_test) { } ASSERT_EQ(::hybridse::common::kOk, status.code); std::vector output; - std::shared_ptr<::hybridse::vm::TableHandler> result = session.Run(); - if (!result) { - ASSERT_TRUE(false); - } + std::vector output_rows; + ASSERT_EQ(0, session.Run(output_rows)); ::hybridse::codec::RowView rv(session.GetSchema()); ASSERT_EQ(4, session.GetSchema().size()); - auto it = result->GetIterator(); - ASSERT_TRUE(it->Valid()); - const ::hybridse::codec::Row &row = it->GetValue(); + ASSERT_EQ(1u, output_rows.size()); + auto& row = output_rows[0]; rv.Reset(row.buf(), row.size()); ASSERT_EQ(args->pk, rv.GetStringUnsafe(0)); } @@ -303,15 +298,14 @@ TEST_F(TabletCatalogTest, sql_last_join_smoke_test2) { } ASSERT_EQ(::hybridse::common::kOk, status.code); std::vector output; - std::shared_ptr<::hybridse::vm::TableHandler> result = session.Run(); - if (!result) { + std::vector outputs; + if (0 != session.Run(outputs)) { ASSERT_TRUE(false); } ::hybridse::codec::RowView rv(session.GetSchema()); ASSERT_EQ(4, session.GetSchema().size()); - auto it = result->GetIterator(); - ASSERT_TRUE(it->Valid()); - const ::hybridse::codec::Row &row = it->GetValue(); + ASSERT_EQ(1u, outputs.size()); + auto& row = outputs[0]; rv.Reset(row.buf(), row.size()); const char *data = NULL; uint32_t data_size = 0; @@ -343,9 +337,8 @@ TEST_F(TabletCatalogTest, sql_window_smoke_500_test) { std::cout << status.msg << std::endl; } ASSERT_EQ(::hybridse::common::kOk, status.code); - std::vector output; - std::shared_ptr<::hybridse::vm::TableHandler> result = session.Run(); - if (!result) { + std::vector outputs; + if (0 != session.Run(outputs)) { ASSERT_TRUE(false); } ::hybridse::codec::RowView rv(session.GetSchema()); @@ -371,16 +364,14 @@ TEST_F(TabletCatalogTest, sql_window_smoke_test) { std::cout << status.msg << std::endl; } ASSERT_EQ(::hybridse::common::kOk, status.code); - std::vector output; - std::shared_ptr<::hybridse::vm::TableHandler> result = session.Run(); - if (!result) { + std::vector outputs; + if (0 != session.Run(outputs)) { ASSERT_TRUE(false); } ::hybridse::codec::RowView rv(session.GetSchema()); ASSERT_EQ(3, session.GetSchema().size()); - auto it = result->GetIterator(); - ASSERT_TRUE(it->Valid()); - const ::hybridse::codec::Row &row = it->GetValue(); + ASSERT_EQ(1u, outputs.size()); + const ::hybridse::codec::Row &row = outputs[0]; rv.Reset(row.buf(), row.size()); int64_t val = 0; ASSERT_EQ(0, rv.GetInt64(0, &val)); diff --git a/src/client/tablet_client.cc b/src/client/tablet_client.cc index 7c4781ed97f..152804001a9 100644 --- a/src/client/tablet_client.cc +++ b/src/client/tablet_client.cc @@ -83,14 +83,26 @@ bool TabletClient::Query(const std::string& db, const std::string& sql, const st return true; } -bool TabletClient::Query(const std::string& db, const std::string& sql, brpc::Controller* cntl, - ::openmldb::api::QueryResponse* response, const bool is_debug) { +bool TabletClient::Query(const std::string& db, const std::string& sql, + const std::vector& parameter_types, + const std::string& parameter_row, + brpc::Controller* cntl, ::openmldb::api::QueryResponse* response, const bool is_debug) { if (cntl == NULL || response == NULL) return false; ::openmldb::api::QueryRequest request; request.set_sql(sql); request.set_db(db); request.set_is_batch(true); request.set_is_debug(is_debug); + request.set_parameter_row_size(parameter_row.size()); + request.set_parameter_row_slices(1); + for (auto& type : parameter_types) { + request.add_parameter_types(type); + } + auto& io_buf = cntl->request_attachment(); + if (!codec::EncodeRpcRow(reinterpret_cast(parameter_row.data()), parameter_row.size(), &io_buf)) { + LOG(WARNING) << "Encode parameter buffer failed"; + return false; + } bool ok = client_.SendRequest(&::openmldb::api::TabletServer_Stub::Query, cntl, &request, response); if (!ok || response->code() != 0) { diff --git a/src/client/tablet_client.h b/src/client/tablet_client.h index e923c702e5d..4d4244453d2 100644 --- a/src/client/tablet_client.h +++ b/src/client/tablet_client.h @@ -68,8 +68,9 @@ class TabletClient { const openmldb::common::VersionPair& pair, std::string& msg); // NOLINT - bool Query(const std::string& db, const std::string& sql, brpc::Controller* cntl, - ::openmldb::api::QueryResponse* response, const bool is_debug = false); + bool Query(const std::string& db, const std::string& sql, + const std::vector& parameter_types, const std::string& parameter_row, + brpc::Controller* cntl, ::openmldb::api::QueryResponse* response, const bool is_debug = false); bool Query(const std::string& db, const std::string& sql, const std::string& row, brpc::Controller* cntl, ::openmldb::api::QueryResponse* response, const bool is_debug = false); diff --git a/src/proto/tablet.proto b/src/proto/tablet.proto index 0ba43dcb945..8df4dd4cc24 100755 --- a/src/proto/tablet.proto +++ b/src/proto/tablet.proto @@ -653,6 +653,9 @@ message QueryRequest { optional uint64 task_id = 7; optional uint32 row_size = 8; optional uint32 row_slices = 9; + optional uint32 parameter_row_size = 10; + optional uint32 parameter_row_slices = 11; + repeated openmldb.type.DataType parameter_types = 12; } message QueryResponse { diff --git a/src/sdk/mini_cluster.h b/src/sdk/mini_cluster.h index aa6d83155fd..8b69c975c73 100644 --- a/src/sdk/mini_cluster.h +++ b/src/sdk/mini_cluster.h @@ -55,7 +55,9 @@ namespace openmldb { namespace sdk { constexpr int MAX_TABLET_NUM = 3; +#ifdef __linux__ #pragma pack(8) +#endif class MiniCluster { public: explicit MiniCluster(int32_t zk_port) diff --git a/src/sdk/mini_cluster_bm.cc b/src/sdk/mini_cluster_bm.cc index 67b18266f20..cdf602370e3 100644 --- a/src/sdk/mini_cluster_bm.cc +++ b/src/sdk/mini_cluster_bm.cc @@ -104,7 +104,7 @@ void BM_RequestQuery(benchmark::State& state, hybridse::sqlcase::SqlCase& sql_ca auto rs = router->CallProcedure(sql_case.db(), sql_case.sp_name_, request_row, &status); openmldb::sdk::SQLSDKTest::PrintResultSet(rs); } else { - auto rs = router->ExecuteSQL(sql_case.db(), sql, request_row, &status); + auto rs = router->ExecuteSQLRequest(sql_case.db(), sql, request_row, &status); openmldb::sdk::SQLSDKTest::PrintResultSet(rs); } } @@ -131,7 +131,7 @@ void BM_RequestQuery(benchmark::State& state, hybridse::sqlcase::SqlCase& sql_ca } state.SkipWithError("BENCHMARK DEBUG"); } else { - auto rs = router->ExecuteSQL(sql_case.db(), sql, request_row, &status); + auto rs = router->ExecuteSQLRequest(sql_case.db(), sql, request_row, &status); if (!rs) FAIL() << "sql case expect success == true"; openmldb::sdk::SQLSDKTest::PrintResultSet(rs); hybridse::type::TableDef output_table; @@ -158,7 +158,7 @@ void BM_RequestQuery(benchmark::State& state, hybridse::sqlcase::SqlCase& sql_ca } } else { for (auto _ : state) { - benchmark::DoNotOptimize(router->ExecuteSQL(sql_case.db(), sql, request_row, &status)); + benchmark::DoNotOptimize(router->ExecuteSQLRequest(sql_case.db(), sql, request_row, &status)); } } } diff --git a/src/sdk/mini_cluster_microbenchmark.cc b/src/sdk/mini_cluster_microbenchmark.cc index d3df18bfe1f..08b4ed1fcbb 100644 --- a/src/sdk/mini_cluster_microbenchmark.cc +++ b/src/sdk/mini_cluster_microbenchmark.cc @@ -460,18 +460,18 @@ static void BM_SimpleRowWindow(benchmark::State& state) { // NOLINT request_row->AppendTimestamp(ts + 1000); request_row->Build(); for (int i = 0; i < 10; i++) { - router->ExecuteSQL(db, exe_sql, request_row, &status); + router->ExecuteSQLRequest(db, exe_sql, request_row, &status); } LOG(INFO) << "------------WARMUP FINISHED ------------\n\n"; if (hybridse::sqlcase::SqlCase::IsDebug() || hybridse::sqlcase::SqlCase::IS_PERF()) { for (auto _ : state) { - router->ExecuteSQL(db, exe_sql, request_row, &status); + router->ExecuteSQLRequest(db, exe_sql, request_row, &status); state.SkipWithError("benchmark case debug"); break; } } else { for (auto _ : state) { - benchmark::DoNotOptimize(router->ExecuteSQL(db, exe_sql, request_row, &status)); + benchmark::DoNotOptimize(router->ExecuteSQLRequest(db, exe_sql, request_row, &status)); } } } @@ -551,18 +551,18 @@ static void BM_SimpleRow4Window(benchmark::State& state) { // NOLINT request_row->AppendTimestamp(ts + 1000); request_row->Build(); for (int i = 0; i < 10; i++) { - router->ExecuteSQL(db, exe_sql, request_row, &status); + router->ExecuteSQLParameterized(db, exe_sql, request_row, &status); } LOG(INFO) << "------------WARMUP FINISHED ------------\n\n"; if (hybridse::sqlcase::SqlCase::IsDebug() || hybridse::sqlcase::SqlCase::IS_PERF()) { for (auto _ : state) { - router->ExecuteSQL(db, exe_sql, request_row, &status); + router->ExecuteSQLParameterized(db, exe_sql, request_row, &status); state.SkipWithError("benchmark case debug"); break; } } else { for (auto _ : state) { - benchmark::DoNotOptimize(router->ExecuteSQL(db, exe_sql, request_row, &status)); + benchmark::DoNotOptimize(router->ExecuteSQLParameterized(db, exe_sql, request_row, &status)); } } } diff --git a/src/sdk/sql_cluster_router.cc b/src/sdk/sql_cluster_router.cc index 5b9fb53b769..8070dff13e0 100644 --- a/src/sdk/sql_cluster_router.cc +++ b/src/sdk/sql_cluster_router.cc @@ -191,10 +191,15 @@ bool SQLClusterRouter::Init() { std::shared_ptr SQLClusterRouter::GetRequestRow(const std::string& db, const std::string& sql, ::hybridse::sdk::Status* status) { + return GetRequestRow(db, sql, std::shared_ptr<::hybridse::sdk::Schema>(), status); +} +std::shared_ptr SQLClusterRouter::GetRequestRow( + const std::string& db, const std::string& sql, const std::shared_ptr<::hybridse::sdk::Schema> parameter_schema, + ::hybridse::sdk::Status* status) { if (status == NULL) return std::shared_ptr(); std::shared_ptr cache = GetCache(db, sql); std::set col_set; - if (cache) { + if (cache && cache->IsCompatibleCache(parameter_schema)) { status->code = 0; const std::string& router_col = cache->router.GetRouterCol(); if (!router_col.empty()) { @@ -204,7 +209,12 @@ std::shared_ptr SQLClusterRouter::GetRequestRow(const std::string } ::hybridse::vm::ExplainOutput explain; ::hybridse::base::Status vm_status; - bool ok = cluster_sdk_->GetEngine()->Explain(sql, db, ::hybridse::vm::kRequestMode, &explain, &vm_status); + + const std::shared_ptr<::hybridse::sdk::SchemaImpl> schema_impl = + std::dynamic_pointer_cast<::hybridse::sdk::SchemaImpl>(parameter_schema); + bool ok = cluster_sdk_->GetEngine()->Explain(sql, db, ::hybridse::vm::kRequestMode, + schema_impl ? schema_impl->GetSchema() : ::hybridse::vm::Schema(), + &explain, &vm_status); if (!ok) { status->code = -1; status->msg = vm_status.msg; @@ -213,7 +223,7 @@ std::shared_ptr SQLClusterRouter::GetRequestRow(const std::string } std::shared_ptr<::hybridse::sdk::SchemaImpl> schema = std::make_shared<::hybridse::sdk::SchemaImpl>(explain.input_schema); - SetCache(db, sql, std::make_shared(schema, explain.router)); + SetCache(db, sql, std::make_shared(schema, parameter_schema, explain.router)); const std::string& router_col = explain.router.GetRouterCol(); if (!router_col.empty()) { col_set.insert(router_col); @@ -443,13 +453,15 @@ DefaultValueMap SQLClusterRouter::GetDefaultMap(std::shared_ptr<::openmldb::name if (!column_map.empty()) { i = column_map.at(idx); } - if (hybridse::node::kExprPrimary != row->children_.at(i)->GetExprType()) { - LOG(WARNING) << "insert value isn't const value"; + if (hybridse::node::kExprPrimary != row->children_.at(i)->GetExprType() + && hybridse::node::kExprParameter != row->children_.at(i)->GetExprType()) { + LOG(WARNING) << "insert value isn't const value or placeholder"; return DefaultValueMap(); } - ::hybridse::node::ConstNode* primary = - dynamic_cast<::hybridse::node::ConstNode*>(row->children_.at(i)); - if (!primary->IsPlaceholder()) { + + if (hybridse::node::kExprPrimary == row->children_.at(i)->GetExprType()) { + ::hybridse::node::ConstNode* primary = + dynamic_cast<::hybridse::node::ConstNode*>(row->children_.at(i)); std::shared_ptr<::hybridse::node::ConstNode> val; if (primary->IsNull()) { if (column.not_null()) { @@ -626,28 +638,51 @@ bool SQLClusterRouter::DropDB(const std::string& db, hybridse::sdk::Status* stat } return true; } - std::shared_ptr<::openmldb::client::TabletClient> SQLClusterRouter::GetTabletClient( const std::string& db, const std::string& sql, const std::shared_ptr& row) { + return GetTabletClient(db, sql, row, std::shared_ptr()); +} +std::shared_ptr<::openmldb::client::TabletClient> SQLClusterRouter::GetTabletClient( + const std::string& db, const std::string& sql, const std::shared_ptr& row, + const std::shared_ptr& parameter) { + ::hybridse::codec::Schema parameter_schema_raw; + if (parameter) { + for (int i = 0; i < parameter->GetSchema()->GetColumnCnt(); i++) { + auto column = parameter_schema_raw.Add(); + hybridse::type::Type hybridse_type; + if (!openmldb::catalog::SchemaAdapter::ConvertType(parameter->GetSchema()->GetColumnType(i), + &hybridse_type)) { + LOG(WARNING) << "Invalid parameter type "; + return std::shared_ptr<::openmldb::client::TabletClient>(); + } + column->set_type(hybridse_type); + } + } std::shared_ptr<::openmldb::catalog::TabletAccessor> tablet; auto cache = GetCache(db, sql); + auto parameter_schema = std::make_shared<::hybridse::sdk::SchemaImpl>(parameter_schema_raw); + if (cache && cache->IsCompatibleCache(parameter_schema)) { + cache = std::shared_ptr(); + } if (!cache) { ::hybridse::vm::ExplainOutput explain; ::hybridse::base::Status vm_status; - if (cluster_sdk_->GetEngine()->Explain(sql, db, ::hybridse::vm::kBatchMode, &explain, &vm_status)) { + if (cluster_sdk_->GetEngine()->Explain(sql, db, ::hybridse::vm::kBatchMode, parameter_schema_raw, &explain, + &vm_status)) { std::shared_ptr<::hybridse::sdk::SchemaImpl> schema; if (explain.input_schema.size() > 0) { schema = std::make_shared<::hybridse::sdk::SchemaImpl>(explain.input_schema); } else { auto table_info = cluster_sdk_->GetTableInfo(db, explain.router.GetMainTable()); - ::hybridse::vm::Schema raw_schema; + ::hybridse::codec::Schema raw_schema; if (table_info && ::openmldb::catalog::SchemaAdapter::ConvertSchema(table_info->column_desc(), &raw_schema)) { schema = std::make_shared<::hybridse::sdk::SchemaImpl>(raw_schema); } } if (schema) { - cache = std::make_shared(schema, explain.router); + cache = std::make_shared( + schema, parameter_schema, explain.router); SetCache(db, sql, cache); } } @@ -734,10 +769,10 @@ void SQLClusterRouter::GetTables(::hybridse::vm::PhysicalOpNode* node, std::set< GetTables(node->GetProducer(i), tables); } } - -std::shared_ptr SQLClusterRouter::ExecuteSQL(const std::string& db, const std::string& sql, - std::shared_ptr row, - hybridse::sdk::Status* status) { +std::shared_ptr SQLClusterRouter::ExecuteSQLRequest(const std::string& db, + const std::string& sql, + std::shared_ptr row, + hybridse::sdk::Status* status) { if (!row || !status) { LOG(WARNING) << "input is invalid"; return std::shared_ptr<::hybridse::sdk::ResultSet>(); @@ -767,19 +802,31 @@ std::shared_ptr SQLClusterRouter::ExecuteSQL(const std auto rs = ResultSetSQL::MakeResultSet(response, cntl, status); return rs; } - std::shared_ptr<::hybridse::sdk::ResultSet> SQLClusterRouter::ExecuteSQL(const std::string& db, const std::string& sql, ::hybridse::sdk::Status* status) { + return ExecuteSQLParameterized(db, sql, std::shared_ptr(), status); +} +std::shared_ptr<::hybridse::sdk::ResultSet> SQLClusterRouter::ExecuteSQLParameterized( + const std::string& db, const std::string& sql, std::shared_ptr parameter, + ::hybridse::sdk::Status* status) { auto cntl = std::make_shared<::brpc::Controller>(); cntl->set_timeout_ms(options_.request_timeout); auto response = std::make_shared<::openmldb::api::QueryResponse>(); - auto client = GetTabletClient(db, sql, std::shared_ptr()); + std::vector parameter_types; + if (parameter && !ExtractDBTypes(parameter->GetSchema(), parameter_types)) { + status->msg = "convert parameter types error"; + status->code = -1; + return std::shared_ptr<::hybridse::sdk::ResultSet>(); + } + + auto client = GetTabletClient(db, sql, std::shared_ptr(), parameter); if (!client) { DLOG(INFO) << "no tablet avilable for sql " << sql; return std::shared_ptr<::hybridse::sdk::ResultSet>(); } DLOG(INFO) << " send query to tablet " << client->GetEndpoint(); - if (!client->Query(db, sql, cntl.get(), response.get(), options_.enable_debug)) { + if (!client->Query(db, sql, parameter_types, parameter ? parameter->GetRow() : "", cntl.get(), response.get(), + options_.enable_debug)) { return std::shared_ptr<::hybridse::sdk::ResultSet>(); } auto rs = ResultSetSQL::MakeResultSet(response, cntl, status); @@ -796,7 +843,7 @@ std::shared_ptr SQLClusterRouter::ExecuteSQLBatchReque auto cntl = std::make_shared<::brpc::Controller>(); cntl->set_timeout_ms(options_.request_timeout); auto response = std::make_shared<::openmldb::api::SQLBatchRequestQueryResponse>(); - auto client = GetTabletClient(db, sql, std::shared_ptr()); + auto client = GetTabletClient(db, sql, std::shared_ptr(), std::shared_ptr()); if (!client) { status->code = -1; status->msg = "no tablet found"; @@ -976,7 +1023,9 @@ std::shared_ptr SQLClusterRouter::Explain(const std::string& db, co ::hybridse::sdk::Status* status) { ::hybridse::vm::ExplainOutput explain_output; ::hybridse::base::Status vm_status; - bool ok = cluster_sdk_->GetEngine()->Explain(sql, db, ::hybridse::vm::kRequestMode, &explain_output, &vm_status); + ::hybridse::codec::Schema parameter_schema; + bool ok = cluster_sdk_->GetEngine()->Explain(sql, db, ::hybridse::vm::kRequestMode, parameter_schema, + &explain_output, &vm_status); if (!ok) { status->code = -1; status->msg = vm_status.msg; @@ -1134,10 +1183,11 @@ bool SQLClusterRouter::HandleSQLCreateProcedure(hybridse::node::CreateProcedureP bool ok; hybridse::vm::ExplainOutput explain_output; if (input_common_column_indices.empty()) { - ok = cluster_sdk_->GetEngine()->Explain(sql, db, hybridse::vm::kRequestMode, &explain_output, &sql_status); + ok = cluster_sdk_->GetEngine()->Explain(sql, db, hybridse::vm::kRequestMode, &explain_output, + &sql_status); } else { - ok = cluster_sdk_->GetEngine()->Explain(sql, db, hybridse::vm::kBatchRequestMode, input_common_column_indices, - &explain_output, &sql_status); + ok = cluster_sdk_->GetEngine()->Explain(sql, db, hybridse::vm::kBatchRequestMode, + input_common_column_indices, &explain_output, &sql_status); } if (!ok) { *msg = "fail to explain sql" + sql_status.msg; @@ -1211,7 +1261,20 @@ bool SQLClusterRouter::CheckSQLSyntax(const std::string& sql) { } return true; } - +bool SQLClusterRouter::ExtractDBTypes(const std::shared_ptr schema, + std::vector& db_types) { // NOLINT + if (schema) { + for (int i = 0; i < schema->GetColumnCnt(); i++) { + openmldb::type::DataType casted_type; + if (!openmldb::catalog::SchemaAdapter::ConvertType(schema->GetColumnType(i), &casted_type)) { + LOG(WARNING) << "Invalid parameter type " << schema->GetColumnType(i); + return false; + } + db_types.push_back(casted_type); + } + } + return true; +} std::vector> SQLClusterRouter::ShowProcedure(std::string* msg) { std::vector> vec; if (msg == nullptr) { diff --git a/src/sdk/sql_cluster_router.h b/src/sdk/sql_cluster_router.h index 240d1d602a0..eb5d268cbfb 100644 --- a/src/sdk/sql_cluster_router.h +++ b/src/sdk/sql_cluster_router.h @@ -57,12 +57,36 @@ struct SQLCache { column_schema = openmldb::sdk::ConvertToSchema(table_info); } - SQLCache(std::shared_ptr<::hybridse::sdk::Schema> column_schema, const ::hybridse::vm::Router& input_router) - : table_info(), default_map(), column_schema(column_schema), str_length(0), router(input_router) {} - + SQLCache(std::shared_ptr<::hybridse::sdk::Schema> column_schema, + std::shared_ptr<::hybridse::sdk::Schema> parameter_schema, const ::hybridse::vm::Router& input_router) + : table_info(), + default_map(), + column_schema(column_schema), + parameter_schema(parameter_schema), + str_length(0), + router(input_router) {} + bool IsCompatibleCache(std::shared_ptr<::hybridse::sdk::Schema> other_parameter_schema) { + if (!parameter_schema && !other_parameter_schema) { + return true; + } + if (!parameter_schema || !other_parameter_schema) { + return false; + } + if (parameter_schema->GetColumnCnt() != other_parameter_schema->GetColumnCnt()) { + return false; + } + + for (int i = 0; i < parameter_schema->GetColumnCnt(); i++) { + if (parameter_schema->GetColumnType(i) != other_parameter_schema->GetColumnType(i)) { + return false; + } + } + return true; + } std::shared_ptr<::openmldb::nameserver::TableInfo> table_info; DefaultValueMap default_map; std::shared_ptr<::hybridse::sdk::Schema> column_schema; + std::shared_ptr<::hybridse::sdk::Schema> parameter_schema; uint32_t str_length; ::hybridse::vm::Router router; }; @@ -98,7 +122,9 @@ class SQLClusterRouter : public SQLRouter { std::shared_ptr GetRequestRow(const std::string& db, const std::string& sql, ::hybridse::sdk::Status* status) override; - + std::shared_ptr GetRequestRow( + const std::string& db, const std::string& sql, + const std::shared_ptr<::hybridse::sdk::Schema> parameter_schema, hybridse::sdk::Status* status) override; std::shared_ptr GetRequestRowByProcedure(const std::string& db, const std::string& sp_name, ::hybridse::sdk::Status* status) override; @@ -108,11 +134,14 @@ class SQLClusterRouter : public SQLRouter { std::shared_ptr GetInsertRows(const std::string& db, const std::string& sql, ::hybridse::sdk::Status* status) override; + std::shared_ptr ExecuteSQLRequest(const std::string& db, const std::string& sql, + std::shared_ptr row, + hybridse::sdk::Status* status) override; std::shared_ptr ExecuteSQL(const std::string& db, const std::string& sql, - std::shared_ptr row, - hybridse::sdk::Status* status) override; - - std::shared_ptr ExecuteSQL(const std::string& db, const std::string& sql, + ::hybridse::sdk::Status* status) override; + /// Execute batch SQL with parameter row + std::shared_ptr ExecuteSQLParameterized(const std::string& db, const std::string& sql, + std::shared_ptr parameter, ::hybridse::sdk::Status* status) override; std::shared_ptr ExecuteSQLBatchRequest(const std::string& db, const std::string& sql, @@ -145,8 +174,11 @@ class SQLClusterRouter : public SQLRouter { const std::string& db, const std::string& sp_name, int64_t timeout_ms, std::shared_ptr row_batch, hybridse::sdk::Status* status); - std::shared_ptr<::openmldb::client::TabletClient> GetTabletClient(const std::string& db, const std::string& sql, - const std::shared_ptr& row); + std::shared_ptr<::openmldb::client::TabletClient> GetTabletClient( + const std::string& db, const std::string& sql, const std::shared_ptr& row); + std::shared_ptr<::openmldb::client::TabletClient> GetTabletClient( + const std::string& db, const std::string& sql, const std::shared_ptr& row, + const std::shared_ptr& parameter_row); private: void GetTables(::hybridse::vm::PhysicalOpNode* node, std::set* tables); @@ -184,6 +216,8 @@ class SQLClusterRouter : public SQLRouter { std::shared_ptr GetTablet(const std::string& db, const std::string& sp_name, hybridse::sdk::Status* status); + bool ExtractDBTypes(const std::shared_ptr schema, + std::vector& parameter_types); // NOLINT private: SQLRouterOptions options_; diff --git a/src/sdk/sql_cluster_test.cc b/src/sdk/sql_cluster_test.cc index dbde94b8f1c..98a0ef09120 100644 --- a/src/sdk/sql_cluster_test.cc +++ b/src/sdk/sql_cluster_test.cc @@ -158,13 +158,37 @@ static std::shared_ptr GetNewSQLRouter() { sql_opt.enable_debug = hybridse::sqlcase::SqlCase::IsDebug(); return NewClusterSQLRouter(sql_opt); } - +static bool IsRequestSupportMode(const std::string& mode) { + if (mode.find("hybridse-only") != std::string::npos || + mode.find("rtidb-unsupport") != std::string::npos || + mode.find("request-unsupport") != std::string::npos + || mode.find("cluster-unsupport") != std::string::npos) { + return false; + } + return true; +} +static bool IsBatchRequestSupportMode(const std::string& mode) { + if (mode.find("hybridse-only") != std::string::npos || + mode.find("rtidb-unsupport") != std::string::npos || + mode.find("batch-request-unsupport") != std::string::npos || + mode.find("request-unsupport") != std::string::npos + || mode.find("cluster-unsupport") != std::string::npos) { + return false; + } + return true; +} +static bool IsBatchSupportMode(const std::string& mode) { + if (mode.find("hybridse-only") != std::string::npos || + mode.find("rtidb-unsupport") != std::string::npos || + mode.find("batch-unsupport") != std::string::npos + || mode.find("cluster-unsupport") != std::string::npos) { + return false; + } + return true; +} TEST_P(SQLSDKQueryTest, sql_sdk_distribute_batch_request_test) { auto sql_case = GetParam(); - if (boost::contains(sql_case.mode(), "rtidb-unsupport") || - boost::contains(sql_case.mode(), "rtidb-request-unsupport") || - boost::contains(sql_case.mode(), "request-unsupport") || - boost::contains(sql_case.mode(), "cluster-unsupport")) { + if (!IsBatchRequestSupportMode(sql_case.mode())) { LOG(WARNING) << "Unsupport mode: " << sql_case.mode(); return; } @@ -179,10 +203,7 @@ TEST_P(SQLSDKQueryTest, sql_sdk_distribute_batch_request_test) { } TEST_P(SQLSDKBatchRequestQueryTest, sql_sdk_distribute_batch_request_test) { auto sql_case = GetParam(); - if (boost::contains(sql_case.mode(), "rtidb-unsupport") || - boost::contains(sql_case.mode(), "rtidb-request-unsupport") || - boost::contains(sql_case.mode(), "request-unsupport") || - boost::contains(sql_case.mode(), "cluster-unsupport")) { + if (!IsBatchRequestSupportMode(sql_case.mode())) { LOG(WARNING) << "Unsupport mode: " << sql_case.mode(); return; } @@ -199,10 +220,7 @@ TEST_P(SQLSDKBatchRequestQueryTest, sql_sdk_distribute_batch_request_test) { TEST_P(SQLSDKQueryTest, sql_sdk_distribute_request_test) { auto sql_case = GetParam(); LOG(INFO) << "ID: " << sql_case.id() << ", DESC: " << sql_case.desc(); - if (boost::contains(sql_case.mode(), "rtidb-unsupport") || - boost::contains(sql_case.mode(), "rtidb-request-unsupport") || - boost::contains(sql_case.mode(), "request-unsupport") || - boost::contains(sql_case.mode(), "cluster-unsupport")) { + if (!IsRequestSupportMode(sql_case.mode())) { LOG(WARNING) << "Unsupport mode: " << sql_case.mode(); return; } @@ -212,10 +230,7 @@ TEST_P(SQLSDKQueryTest, sql_sdk_distribute_request_test) { } TEST_P(SQLSDKQueryTest, sql_sdk_distribute_batch_request_single_partition_test) { auto sql_case = GetParam(); - if (boost::contains(sql_case.mode(), "rtidb-unsupport") || - boost::contains(sql_case.mode(), "rtidb-request-unsupport") || - boost::contains(sql_case.mode(), "request-unsupport") || - boost::contains(sql_case.mode(), "cluster-unsupport")) { + if (!IsBatchRequestSupportMode(sql_case.mode())) { LOG(WARNING) << "Unsupport mode: " << sql_case.mode(); return; } @@ -231,10 +246,7 @@ TEST_P(SQLSDKQueryTest, sql_sdk_distribute_batch_request_single_partition_test) } TEST_P(SQLSDKBatchRequestQueryTest, sql_sdk_distribute_batch_request_single_partition_test) { auto sql_case = GetParam(); - if (boost::contains(sql_case.mode(), "rtidb-unsupport") || - boost::contains(sql_case.mode(), "rtidb-request-unsupport") || - boost::contains(sql_case.mode(), "request-unsupport") || - boost::contains(sql_case.mode(), "cluster-unsupport")) { + if (!IsBatchRequestSupportMode(sql_case.mode())) { LOG(WARNING) << "Unsupport mode: " << sql_case.mode(); return; } @@ -267,10 +279,7 @@ TEST_P(SQLSDKBatchRequestQueryTest, sql_sdk_distribute_batch_request_single_part TEST_P(SQLSDKBatchRequestQueryTest, sql_sdk_distribute_batch_request_procedure_test) { auto sql_case = GetParam(); - if (boost::contains(sql_case.mode(), "rtidb-unsupport") || - boost::contains(sql_case.mode(), "rtidb-request-unsupport") || - boost::contains(sql_case.mode(), "request-unsupport") || - boost::contains(sql_case.mode(), "cluster-unsupport")) { + if (!IsBatchRequestSupportMode(sql_case.mode())) { LOG(WARNING) << "Unsupport mode: " << sql_case.mode(); return; } @@ -288,10 +297,7 @@ TEST_P(SQLSDKBatchRequestQueryTest, sql_sdk_distribute_batch_request_procedure_t TEST_P(SQLSDKQueryTest, sql_sdk_distribute_request_procedure_test) { auto sql_case = GetParam(); LOG(INFO) << "ID: " << sql_case.id() << ", DESC: " << sql_case.desc(); - if (boost::contains(sql_case.mode(), "rtidb-unsupport") || - boost::contains(sql_case.mode(), "rtidb-request-unsupport") || - boost::contains(sql_case.mode(), "request-unsupport") || - boost::contains(sql_case.mode(), "cluster-unsupport")) { + if (!IsRequestSupportMode(sql_case.mode())) { LOG(WARNING) << "Unsupport mode: " << sql_case.mode(); return; } @@ -302,10 +308,7 @@ TEST_P(SQLSDKQueryTest, sql_sdk_distribute_request_procedure_test) { } TEST_P(SQLSDKBatchRequestQueryTest, sql_sdk_distribute_batch_request_procedure_async_test) { auto sql_case = GetParam(); - if (boost::contains(sql_case.mode(), "rtidb-unsupport") || - boost::contains(sql_case.mode(), "rtidb-request-unsupport") || - boost::contains(sql_case.mode(), "request-unsupport") || - boost::contains(sql_case.mode(), "cluster-unsupport")) { + if (!IsRequestSupportMode(sql_case.mode())) { LOG(WARNING) << "Unsupport mode: " << sql_case.mode(); return; } @@ -323,10 +326,7 @@ TEST_P(SQLSDKBatchRequestQueryTest, sql_sdk_distribute_batch_request_procedure_a TEST_P(SQLSDKQueryTest, sql_sdk_distribute_request_procedure_async_test) { auto sql_case = GetParam(); LOG(INFO) << "ID: " << sql_case.id() << ", DESC: " << sql_case.desc(); - if (boost::contains(sql_case.mode(), "rtidb-unsupport") || - boost::contains(sql_case.mode(), "rtidb-request-unsupport") || - boost::contains(sql_case.mode(), "request-unsupport") || - boost::contains(sql_case.mode(), "cluster-unsupport")) { + if (!IsRequestSupportMode(sql_case.mode())) { LOG(WARNING) << "Unsupport mode: " << sql_case.mode(); return; } diff --git a/src/sdk/sql_request_row.cc b/src/sdk/sql_request_row.cc index c008755a908..9feb90df802 100644 --- a/src/sdk/sql_request_row.cc +++ b/src/sdk/sql_request_row.cc @@ -92,6 +92,7 @@ bool SQLRequestRow::Init(int32_t str_length) { return true; } has_error_ = false; + is_ok_ = false; str_length_expect_ = str_length; str_length_current_ = 0; uint32_t total_length = str_field_start_offset_; diff --git a/src/sdk/sql_router.h b/src/sdk/sql_router.h index de03b1c06df..e882546a502 100644 --- a/src/sdk/sql_router.h +++ b/src/sdk/sql_router.h @@ -87,7 +87,9 @@ class SQLRouter { virtual std::shared_ptr GetRequestRow(const std::string& db, const std::string& sql, hybridse::sdk::Status* status) = 0; - + virtual std::shared_ptr GetRequestRow( + const std::string& db, const std::string& sql, const std::shared_ptr<::hybridse::sdk::Schema> parameter_schema, + hybridse::sdk::Status* status) = 0; virtual std::shared_ptr GetRequestRowByProcedure(const std::string& db, const std::string& sp_name, ::hybridse::sdk::Status* status) = 0; @@ -98,13 +100,17 @@ class SQLRouter { virtual std::shared_ptr GetInsertRows(const std::string& db, const std::string& sql, ::hybridse::sdk::Status* status) = 0; - virtual std::shared_ptr ExecuteSQL(const std::string& db, const std::string& sql, - std::shared_ptr row, - hybridse::sdk::Status* status) = 0; + virtual std::shared_ptr ExecuteSQLRequest( + const std::string& db, const std::string& sql, std::shared_ptr row, + hybridse::sdk::Status* status) = 0; virtual std::shared_ptr ExecuteSQL(const std::string& db, const std::string& sql, hybridse::sdk::Status* status) = 0; + virtual std::shared_ptr ExecuteSQLParameterized( + const std::string& db, const std::string& sql, std::shared_ptr parameter, + hybridse::sdk::Status* status) = 0; + virtual std::shared_ptr ExecuteSQLBatchRequest( const std::string& db, const std::string& sql, std::shared_ptr row_batch, ::hybridse::sdk::Status* status) = 0; diff --git a/src/sdk/sql_router_test.cc b/src/sdk/sql_router_test.cc index c6465d4e6f9..af4e6a1703e 100644 --- a/src/sdk/sql_router_test.cc +++ b/src/sdk/sql_router_test.cc @@ -827,7 +827,7 @@ TEST_F(SQLRouterTest, smoketest_on_sql) { " window w as (partition by " + name + ".col1 order by " + name + ".col2 ROWS BETWEEN 3 PRECEDING AND CURRENT ROW);"; - rs = router->ExecuteSQL(db, sql_window_request, row, &status); + rs = router->ExecuteSQLRequest(db, sql_window_request, row, &status); ASSERT_TRUE(rs != nullptr); ASSERT_EQ(1, rs->Size()); ASSERT_TRUE(rs->Next()); @@ -846,7 +846,7 @@ TEST_F(SQLRouterTest, smoketest_on_sql) { " window w as (partition by " + name + ".col1 order by " + name + ".col2 ROWS BETWEEN 3 PRECEDING AND CURRENT ROW);"; - rs = router->ExecuteSQL(db, sql_window_request, row, &status); + rs = router->ExecuteSQLRequest(db, sql_window_request, row, &status); ASSERT_TRUE(rs != nullptr); ASSERT_EQ(1, rs->Size()); ASSERT_TRUE(rs->Next()); diff --git a/src/sdk/sql_sdk_test.cc b/src/sdk/sql_sdk_test.cc index 2455cc7da3b..4de3a05bd35 100644 --- a/src/sdk/sql_sdk_test.cc +++ b/src/sdk/sql_sdk_test.cc @@ -47,23 +47,38 @@ static std::shared_ptr GetNewSQLRouter() { sql_opt.enable_debug = hybridse::sqlcase::SqlCase::IsDebug(); return NewClusterSQLRouter(sql_opt); } - -static bool IsSupportMode(const std::string& mode) { - if (mode.find("rtidb-unsupport") != std::string::npos || +static bool IsRequestSupportMode(const std::string& mode) { + if (mode.find("hybridse-only") != std::string::npos || + mode.find("rtidb-unsupport") != std::string::npos || mode.find("request-unsupport") != std::string::npos || mode.find("standalone-unsupport") != std::string::npos) { return false; } return true; } - +static bool IsBatchRequestSupportMode(const std::string& mode) { + if (mode.find("hybridse-only") != std::string::npos || + mode.find("rtidb-unsupport") != std::string::npos || + mode.find("batch-request-unsupport") != std::string::npos || + mode.find("request-unsupport") != std::string::npos + || mode.find("standalone-unsupport") != std::string::npos) { + return false; + } + return true; +} +static bool IsBatchSupportMode(const std::string& mode) { + if (mode.find("hybridse-only") != std::string::npos || + mode.find("rtidb-unsupport") != std::string::npos || + mode.find("batch-unsupport") != std::string::npos + || mode.find("standalone-unsupport") != std::string::npos) { + return false; + } + return true; +} TEST_P(SQLSDKTest, sql_sdk_batch_test) { auto sql_case = GetParam(); LOG(INFO) << "ID: " << sql_case.id() << ", DESC: " << sql_case.desc(); - if (boost::contains(sql_case.mode(), "rtidb-unsupport") || - boost::contains(sql_case.mode(), "rtidb-batch-unsupport") || - boost::contains(sql_case.mode(), "batch-unsupport") || - boost::contains(sql_case.mode(), "standalone-unsupport")) { + if (!IsBatchSupportMode(sql_case.mode())) { LOG(WARNING) << "Unsupport mode: " << sql_case.mode(); return; } @@ -83,7 +98,7 @@ TEST_P(SQLSDKTest, sql_sdk_batch_test) { TEST_P(SQLSDKQueryTest, sql_sdk_request_test) { auto sql_case = GetParam(); LOG(INFO) << "ID: " << sql_case.id() << ", DESC: " << sql_case.desc(); - if (!IsSupportMode(sql_case.mode())) { + if (!IsRequestSupportMode(sql_case.mode())) { LOG(WARNING) << "Unsupport mode: " << sql_case.mode(); return; } @@ -94,7 +109,7 @@ TEST_P(SQLSDKQueryTest, sql_sdk_request_test) { TEST_P(SQLSDKQueryTest, sql_sdk_batch_request_test) { auto sql_case = GetParam(); LOG(INFO) << "ID: " << sql_case.id() << ", DESC: " << sql_case.desc(); - if (!IsSupportMode(sql_case.mode())) { + if (!IsBatchRequestSupportMode(sql_case.mode())) { LOG(WARNING) << "Unsupport mode: " << sql_case.mode(); return; } @@ -106,10 +121,7 @@ TEST_P(SQLSDKQueryTest, sql_sdk_batch_request_test) { TEST_P(SQLSDKQueryTest, sql_sdk_batch_test) { auto sql_case = GetParam(); LOG(INFO) << "ID: " << sql_case.id() << ", DESC: " << sql_case.desc(); - if (boost::contains(sql_case.mode(), "rtidb-unsupport") || - boost::contains(sql_case.mode(), "rtidb-batch-unsupport") || - boost::contains(sql_case.mode(), "batch-unsupport") || - boost::contains(sql_case.mode(), "standalone-unsupport")) { + if (!IsBatchSupportMode(sql_case.mode())) { LOG(WARNING) << "Unsupport mode: " << sql_case.mode(); return; } @@ -120,7 +132,7 @@ TEST_P(SQLSDKQueryTest, sql_sdk_batch_test) { TEST_P(SQLSDKQueryTest, sql_sdk_request_procedure_test) { auto sql_case = GetParam(); LOG(INFO) << "ID: " << sql_case.id() << ", DESC: " << sql_case.desc(); - if (!IsSupportMode(sql_case.mode())) { + if (!IsRequestSupportMode(sql_case.mode())) { LOG(WARNING) << "Unsupport mode: " << sql_case.mode(); return; } @@ -132,7 +144,7 @@ TEST_P(SQLSDKQueryTest, sql_sdk_request_procedure_test) { TEST_P(SQLSDKQueryTest, sql_sdk_request_procedure_asyn_test) { auto sql_case = GetParam(); LOG(INFO) << "ID: " << sql_case.id() << ", DESC: " << sql_case.desc(); - if (!IsSupportMode(sql_case.mode())) { + if (!IsRequestSupportMode(sql_case.mode())) { LOG(WARNING) << "Unsupport mode: " << sql_case.mode(); return; } @@ -142,7 +154,7 @@ TEST_P(SQLSDKQueryTest, sql_sdk_request_procedure_asyn_test) { } TEST_P(SQLSDKBatchRequestQueryTest, sql_sdk_batch_request_test) { auto sql_case = GetParam(); - if (!IsSupportMode(sql_case.mode())) { + if (!IsBatchRequestSupportMode(sql_case.mode())) { LOG(WARNING) << "Unsupport mode: " << sql_case.mode(); return; } @@ -157,7 +169,7 @@ TEST_P(SQLSDKBatchRequestQueryTest, sql_sdk_batch_request_test) { } TEST_P(SQLSDKBatchRequestQueryTest, sql_sdk_batch_request_procedure_test) { auto sql_case = GetParam(); - if (!IsSupportMode(sql_case.mode())) { + if (!IsBatchRequestSupportMode(sql_case.mode())) { LOG(WARNING) << "Unsupport mode: " << sql_case.mode(); return; } @@ -173,7 +185,7 @@ TEST_P(SQLSDKBatchRequestQueryTest, sql_sdk_batch_request_procedure_test) { TEST_P(SQLSDKBatchRequestQueryTest, sql_sdk_batch_request_procedure_asyn_test) { auto sql_case = GetParam(); - if (!IsSupportMode(sql_case.mode())) { + if (!IsBatchRequestSupportMode(sql_case.mode())) { LOG(WARNING) << "Unsupport mode: " << sql_case.mode(); return; } @@ -594,6 +606,159 @@ TEST_F(SQLSDKTest, create_table) { ASSERT_TRUE(router->DropDB(db, &status)); } +TEST_F(SQLSDKQueryTest, execute_where_with_parameter) { + std::string ddl = + "create table trans(c_sk_seq string,\n" + " cust_no string,\n" + " pay_cust_name string,\n" + " pay_card_no string,\n" + " payee_card_no string,\n" + " card_type string,\n" + " merch_id string,\n" + " txn_datetime string,\n" + " txn_amt double,\n" + " txn_curr string,\n" + " card_balance double,\n" + " day_openbuy double,\n" + " credit double,\n" + " remainning_credit double,\n" + " indi_openbuy double,\n" + " lgn_ip string,\n" + " IEMI string,\n" + " client_mac string,\n" + " chnl_type int32,\n" + " cust_idt int32,\n" + " cust_idt_no string,\n" + " province string,\n" + " city string,\n" + " latitudeandlongitude string,\n" + " txn_time int64,\n" + " index(key=pay_card_no, ts=txn_time),\n" + " index(key=merch_id, ts=txn_time));"; + SQLRouterOptions sql_opt; + sql_opt.session_timeout = 30000; + sql_opt.zk_cluster = mc_->GetZkCluster(); + sql_opt.zk_path = mc_->GetZkPath(); + sql_opt.enable_debug = hybridse::sqlcase::SqlCase::IsDebug(); + auto router = NewClusterSQLRouter(sql_opt); + if (!router) { + FAIL() << "Fail new cluster sql router"; + } + std::string db = "execute_where_with_parameter"; + hybridse::sdk::Status status; + ASSERT_TRUE(router->CreateDB(db, &status)); + ASSERT_TRUE(router->ExecuteDDL(db, ddl, &status)); + ASSERT_TRUE(router->RefreshCatalog()); + int64_t ts = 1594800959827; + // Insert 3 rows into table trans + { + char buffer[4096]; + sprintf(buffer, // NOLINT + "insert into trans " + "values('c_sk_seq0','cust_no0','pay_cust_name0','card_%d','" + "payee_card_no0','card_type0','mc_%d','2020-" + "10-20 " + "10:23:50',1.0,'txn_curr',2.0,3.0,4.0,5.0,6.0,'lgn_ip0','iemi0'" + ",'client_mac0',10,20,'cust_idt_no0','" + "province0'," + "'city0', 'longitude', %s);", + 0, 0, std::to_string(ts++).c_str()); // NOLINT + std::string insert_sql = std::string(buffer, strlen(buffer)); + ASSERT_TRUE(router->ExecuteInsert(db, insert_sql, &status)); + } + { + char buffer[4096]; + sprintf(buffer, // NOLINT + "insert into trans " + "values('c_sk_seq0','cust_no0','pay_cust_name0','card_%d','" + "payee_card_no0','card_type0','mc_%d','2020-" + "10-20 " + "10:23:50',1.0,'txn_curr',2.0,3.0,4.0,5.0,6.0,'lgn_ip0','iemi0'" + ",'client_mac0',10,20,'cust_idt_no0','" + "province0'," + "'city0', 'longitude', %s);", + 0, 0, std::to_string(ts++).c_str()); // NOLINT + std::string insert_sql = std::string(buffer, strlen(buffer)); + ASSERT_TRUE(router->ExecuteInsert(db, insert_sql, &status)); + } + { + char buffer[4096]; + sprintf(buffer, // NOLINT + "insert into trans " + "values('c_sk_seq0','cust_no0','pay_cust_name0','card_%d','" + "payee_card_no0','card_type0','mc_%d','2020-" + "10-20 " + "10:23:50',1.0,'txn_curr',2.0,3.0,4.0,5.0,6.0,'lgn_ip0','iemi0'" + ",'client_mac0',10,20,'cust_idt_no0','" + "province0'," + "'city0', 'longitude', %s);", + 0, 0, std::to_string(ts++).c_str()); // NOLINT + std::string insert_sql = std::string(buffer, strlen(buffer)); + ASSERT_TRUE(router->ExecuteInsert(db, insert_sql, &status)); + } + + + hybridse::codec::Schema schema; + { + auto column = schema.Add(); + column->set_type(::hybridse::type::kVarchar); + } + { + auto column = schema.Add(); + column->set_type(::hybridse::type::kInt64); + } + const std::shared_ptr<::hybridse::sdk::Schema> schema_impl = std::make_shared(schema); + std::string where_exist = "select * from trans where merch_id = ? and txn_time < ?;"; + // parameterized query + auto parameter_row = std::make_shared(schema_impl, std::set()); + { + ASSERT_EQ(2, parameter_row->GetSchema()->GetColumnCnt()); + ASSERT_TRUE(parameter_row->Init(4)); + ASSERT_TRUE(parameter_row->AppendString("mc_0")); + ASSERT_TRUE(parameter_row->AppendInt64(1594800959830)); + ASSERT_TRUE(parameter_row->Build()); + + auto rs = router->ExecuteSQLParameterized(db, where_exist, parameter_row, &status); + + if (!rs) { + FAIL() << "fail to execute sql"; + } + ASSERT_EQ(rs->Size(), 3); + } + { + ASSERT_TRUE(parameter_row->Init(4)); + ASSERT_TRUE(parameter_row->AppendString("mc_0")); + ASSERT_TRUE(parameter_row->AppendInt64(1594800959828)); + ASSERT_TRUE(parameter_row->Build()); + auto rs = router->ExecuteSQLParameterized(db, where_exist, parameter_row, &status); + if (!rs) { + FAIL() << "fail to execute sql"; + } + ASSERT_EQ(rs->Size(), 1); + } + { + parameter_row->Init(4); + parameter_row->AppendString("mc_0"); + parameter_row->AppendInt64(1594800959827); + parameter_row->Build(); + auto rs = router->ExecuteSQLParameterized(db, where_exist, parameter_row, &status); + if (!rs) { + FAIL() << "fail to execute sql"; + } + ASSERT_EQ(rs->Size(), 0); + } + { + parameter_row->Init(4); + parameter_row->AppendString("mc_1"); + parameter_row->AppendInt64(1594800959830); + parameter_row->Build(); + auto rs = router->ExecuteSQLParameterized(db, where_exist, parameter_row, &status); + if (!rs) { + FAIL() << "fail to execute sql"; + } + ASSERT_EQ(rs->Size(), 0); + } +} } // namespace sdk } // namespace openmldb diff --git a/src/sdk/sql_sdk_test.h b/src/sdk/sql_sdk_test.h index ed9c4784d26..a15ba66787e 100644 --- a/src/sdk/sql_sdk_test.h +++ b/src/sdk/sql_sdk_test.h @@ -211,9 +211,10 @@ void SQLSDKTest::CreateProcedure(hybridse::sqlcase::SqlCase& sql_case, // NOLIN std::string create_sp; if (is_batch) { hybridse::type::TableDef batch_request_schema; - ASSERT_TRUE(sql_case.ExtractTableDef(sql_case.batch_request().columns_, sql_case.batch_request().indexs_, batch_request_schema)); - ASSERT_TRUE( - sql_case.BuildCreateSpSqlFromSchema(batch_request_schema, sql, sql_case.batch_request().common_column_indices_, &create_sp)); + ASSERT_TRUE(sql_case.ExtractTableDef(sql_case.batch_request().columns_, sql_case.batch_request().indexs_, + batch_request_schema)); + ASSERT_TRUE(sql_case.BuildCreateSpSqlFromSchema(batch_request_schema, sql, + sql_case.batch_request().common_column_indices_, &create_sp)); } else { std::set common_idx; ASSERT_TRUE(sql_case.BuildCreateSpSqlFromInput(0, sql, common_idx, &create_sp)); @@ -368,7 +369,6 @@ void SQLSDKTest::CovertHybridSERowToRequestRow(hybridse::codec::RowView* row_vie } ASSERT_TRUE(request_row->Build()); } - void SQLSDKTest::BatchExecuteSQL(hybridse::sqlcase::SqlCase& sql_case, // NOLINT std::shared_ptr router, const std::vector& tbEndpoints) { DLOG(INFO) << "BatchExecuteSQL BEGIN"; @@ -381,17 +381,36 @@ void SQLSDKTest::BatchExecuteSQL(hybridse::sqlcase::SqlCase& sql_case, // NOLIN boost::replace_all(sql, placeholder, sql_case.inputs()[i].name_); } DLOG(INFO) << "format sql 1"; - boost::replace_all(sql, "{auto}", hybridse::sqlcase::SqlCase::GenRand("auto_t") + - std::to_string((int64_t)time(NULL))); - for(size_t endpoint_id = 0; endpoint_id < tbEndpoints.size(); endpoint_id++) { - boost::replace_all(sql, "{tb_endpoint_"+std::to_string(endpoint_id)+"}", tbEndpoints.at(endpoint_id)); + boost::replace_all(sql, "{auto}", + hybridse::sqlcase::SqlCase::GenRand("auto_t") + std::to_string((int64_t)time(NULL))); + for (size_t endpoint_id = 0; endpoint_id < tbEndpoints.size(); endpoint_id++) { + boost::replace_all(sql, "{tb_endpoint_" + std::to_string(endpoint_id) + "}", tbEndpoints.at(endpoint_id)); } DLOG(INFO) << "format sql done"; LOG(INFO) << sql; std::string lower_sql = sql; boost::to_lower(lower_sql); if (boost::algorithm::starts_with(lower_sql, "select")) { - auto rs = router->ExecuteSQL(sql_case.db(), sql, &status); + std::shared_ptr rs; + // parameterized batch query + if (!sql_case.parameters().columns_.empty()) { + auto parameter_schema = sql_case.ExtractParameterTypes(); + std::shared_ptr parameter_row = + std::make_shared( + std::make_shared(parameter_schema), std::set()); + hybridse::codec::RowView row_view(parameter_schema); + std::vector parameter_rows; + sql_case.ExtractRows(parameter_schema, sql_case.parameters().rows_, parameter_rows); + if (parameter_rows.empty()) { + FAIL() << "sql case parameter rows extract fail"; + return; + } + row_view.Reset(parameter_rows[0].buf()); + CovertHybridSERowToRequestRow(&row_view, parameter_row); + rs = router->ExecuteSQLParameterized(sql_case.db(), sql, parameter_row, &status); + } else { + rs = router->ExecuteSQL(sql_case.db(), sql, &status); + } if (!sql_case.expect().success_) { if ((rs)) { FAIL() << "sql case expect success == false"; @@ -504,7 +523,7 @@ void SQLSDKQueryTest::RequestExecuteSQL(hybridse::sqlcase::SqlCase& sql_case, / rs = router->CallProcedure(sql_case.db(), sql_case.sp_name_, request_row, &status); } } else { - rs = router->ExecuteSQL(sql_case.db(), sql, request_row, &status); + rs = router->ExecuteSQLRequest(sql_case.db(), sql, request_row, &status); } if (!rs || status.code != 0) FAIL() << "sql case expect success == true" << status.msg; results.push_back(rs); @@ -808,7 +827,8 @@ INSTANTIATE_TEST_SUITE_P(SQLSDKTestConstsSelect, SQLSDKQueryTest, INSTANTIATE_TEST_SUITE_P(SQLSDKLastJoinWindowQuery, SQLSDKQueryTest, testing::ValuesIn(SQLSDKQueryTest::InitCases("/cases/query/last_join_window_query.yaml"))); - +INSTANTIATE_TEST_SUITE_P(SQLSDKParameterizedQuery, SQLSDKQueryTest, + testing::ValuesIn(SQLSDKQueryTest::InitCases("/cases/query/parameterized_query.yaml"))); // Test Cluster INSTANTIATE_TEST_SUITE_P( diff --git a/src/tablet/tablet_impl.cc b/src/tablet/tablet_impl.cc index 7e6896f0d75..87dc7a97119 100644 --- a/src/tablet/tablet_impl.cc +++ b/src/tablet/tablet_impl.cc @@ -52,6 +52,7 @@ #include "storage/binlog.h" #include "storage/segment.h" #include "tablet/file_sender.h" +#include "catalog/schema_adapter.h" using google::protobuf::RepeatedPtrField; using ::openmldb::base::ReturnCode; @@ -1483,10 +1484,25 @@ void TabletImpl::ProcessQuery(RpcController* ctrl, const openmldb::api::QueryReq ::openmldb::api::QueryResponse* response, butil::IOBuf* buf) { ::hybridse::base::Status status; if (request->is_batch()) { + // convert repeated openmldb:type::DataType into hybridse::codec::Schema + hybridse::codec::Schema parameter_schema; + for (int i = 0; i < request->parameter_types().size(); i++) { + auto column = parameter_schema.Add(); + hybridse::type::Type hybridse_type; + + if (!openmldb::catalog::SchemaAdapter::ConvertType(request->parameter_types(i), &hybridse_type)) { + response->set_msg("Invalid parameter type: " + + openmldb::type::DataType_Name(request->parameter_types(i))); + response->set_code(::openmldb::base::kSQLCompileError); + return; + } + column->set_type(hybridse_type); + } ::hybridse::vm::BatchRunSession session; if (request->is_debug()) { session.EnableDebug(); } + session.SetParameterSchema(parameter_schema); { bool ok = engine_->Get(request->sql(), request->db(), session, status); if (!ok) { @@ -1497,26 +1513,26 @@ void TabletImpl::ProcessQuery(RpcController* ctrl, const openmldb::api::QueryReq } } - auto table = session.Run(); - if (!table) { - DLOG(WARNING) << "fail to run sql " << request->sql(); + ::hybridse::codec::Row parameter_row; + auto& request_buf = static_cast(ctrl)->request_attachment(); + if (request->parameter_row_size() > 0 && + !codec::DecodeRpcRow(request_buf, 0, request->parameter_row_size(), request->parameter_row_slices(), + ¶meter_row)) { response->set_code(::openmldb::base::kSQLRunError); - response->set_msg("fail to run sql"); + response->set_msg("fail to decode parameter row"); return; } - auto iter = table->GetIterator(); - if (!iter) { - response->set_schema(session.GetEncodedSchema()); - response->set_byte_size(0); - response->set_count(0); - response->set_code(::openmldb::base::kOk); + std::vector<::hybridse::codec::Row> output_rows; + int32_t run_ret = session.Run(parameter_row, output_rows); + if (run_ret != 0) { + response->set_msg(status.msg); + response->set_code(::openmldb::base::kSQLRunError); + DLOG(WARNING) << "fail to run sql: " << request->sql(); return; } - iter->SeekToFirst(); uint32_t byte_size = 0; uint32_t count = 0; - while (iter->Valid()) { - const ::hybridse::codec::Row& row = iter->GetValue(); + for (auto& output_row : output_rows) { if (byte_size > FLAGS_scan_max_bytes_size) { LOG(WARNING) << "reach the max byte size truncate result"; response->set_schema(session.GetEncodedSchema()); @@ -1525,9 +1541,8 @@ void TabletImpl::ProcessQuery(RpcController* ctrl, const openmldb::api::QueryReq response->set_code(::openmldb::base::kOk); return; } - byte_size += row.size(); - iter->Next(); - buf->append(reinterpret_cast(row.buf()), row.size()); + byte_size += output_row.size(); + buf->append(reinterpret_cast(output_row.buf()), output_row.size()); count += 1; } response->set_schema(session.GetEncodedSchema()); @@ -1692,9 +1707,9 @@ void TabletImpl::ProcessBatchRequestQuery(RpcController* ctrl, std::vector<::hybridse::codec::Row> output_rows; int32_t run_ret = 0; if (request->has_task_id()) { - session.Run(request->task_id(), input_rows, output_rows); + run_ret = session.Run(request->task_id(), input_rows, output_rows); } else { - session.Run(input_rows, output_rows); + run_ret = session.Run(input_rows, output_rows); } if (run_ret != 0) { response->set_msg(status.msg); diff --git a/steps/init_env.sh b/steps/init_env.sh index e4f2af8b7a0..af4fae41b7a 100644 --- a/steps/init_env.sh +++ b/steps/init_env.sh @@ -20,49 +20,66 @@ set -eE pushd "$(dirname "$0")/.." ROOT=$(pwd) - ARCH=$(arch) - -echo "Install thirdparty ... for $(uname -a)" +# hybridse is required downloaded or install locally +HYBRIDSE_SOURCE=$1 # on local machine, one can tweak thirdparty path by passing extra argument -THIRDPARTY_PATH=${1:-"$ROOT/thirdparty"} +THIRDPARTY_PATH=${2:-"$ROOT/thirdparty"} THIRDSRC_PATH="$ROOT/thirdsrc" -if [ -d "$THIRDPARTY_PATH" ]; then +echo "THIRDPARTY_PATH: "${THIRDPARTY_PATH} +echo "Install thirdparty ... for $(uname -a)" + +if [[ -d "$THIRDPARTY_PATH" ]]; then echo "thirdparty path: $THIRDPARTY_PATH already exist, skip download deps" - exit 0 +else + ./steps/setup_thirdparty.sh ${THIRDPARTY_PATH} fi -mkdir -p "$THIRDPARTY_PATH" -mkdir -p "$THIRDPARTY_PATH/hybridse" -mkdir -p "$THIRDSRC_PATH" +if [[ -d "THIRDSRC_PATH" ]]; then + echo "thirdsrc path: THIRDSRC_PATH already exist, skip download deps" +else + curl -SLo thirdsrc.tar.gz https://github.com/jingchen2222/hybridsql-asserts/releases/download/v0.4.0/thirdsrc-2021-08-03.tar.gz + tar xzf thirdsrc.tar.gz -C "${THIRDSRC_PATH}" --strip-components 1 +fi -pushd "${THIRDSRC_PATH}" +if [ -d "$THIRDPARTY_PATH/hybridse" ]; then + echo "thirdparty/hybridse path: $THIRDPARTY_PATH/hybridse already exist, skip download/install deps" + exit 0 +fi +echo "HYBRIDSE_SOURCE: "${HYBRIDSE_SOURCE} +if [[ ${HYBRIDSE_SOURCE} = "local" ]]; then + echo "Install hybridse locally" + cd "${ROOT}/hybridse" + ln -sf $THIRDPARTY_PATH thirdparty + ln -sf $THIRDSRC_PATH thirdsrc + if uname -a | grep -q Darwin; then + # in case coreutils not install on mac + alias nproc='sysctl -n hw.logicalcpu' + fi + rm -rf build + mkdir -p build && cd build + cmake .. -DCMAKE_BUILD_TYPE=Release -DTESTING_ENABLE=OFF -DEXAMPLES_ENABLE=OFF -DCMAKE_INSTALL_PREFIX="hybridse" + make -j"$(nproc)" install + mv hybridse ${THIRDPARTY_PATH}/hybridse +else + echo "Download hybridse package" + pushd "${THIRDSRC_PATH}" -if [[ "$OSTYPE" = "darwin"* ]]; then - curl -SLo thirdparty.tar.gz https://github.com/jingchen2222/hybridsql-asserts/releases/download/v0.4.0/thirdparty-2021-08-03-darwin-x86_64.tar.gz - curl -SLo libzetasql.tar.gz https://github.com/jingchen2222/zetasql/releases/download/v0.2.0/libzetasql-0.2.0-darwin-x86_64.tar.gz - curl -SLo hybridse.tar.gz https://github.com/4paradigm/HybridSE/releases/download/v0.2.1/hybridse-0.2.1-darwin-x86_64.tar.gz -elif [[ "$OSTYPE" = "linux-gnu"* ]]; then + if [[ "$OSTYPE" = "darwin"* ]]; then + curl -SLo hybridse.tar.gz https://github.com/jingchen2222/OpenMLDB/releases/download/hybridse-v0.3.0-alpha-0817/hybridse-0.3.0-alpha-0817-darwin-x86_64.tar.gz + elif [[ "$OSTYPE" = "linux-gnu"* ]]; then if [[ $ARCH = 'x86_64' ]]; then - curl -SLo thirdparty.tar.gz https://github.com/jingchen2222/hybridsql-asserts/releases/download/v0.4.0/thirdparty-2021-08-03-linux-gnu-x86_64.tar.gz - curl -SLo libzetasql.tar.gz https://github.com/jingchen2222/zetasql/releases/download/v0.2.0/libzetasql-0.2.0-linux-x86_64.tar.gz - curl -SLo hybridse.tar.gz https://github.com/4paradigm/HybridSE/releases/download/v0.2.1/hybridse-0.2.1-linux-x86_64.tar.gz + curl -SLo hybridse.tar.gz https://github.com/jingchen2222/OpenMLDB/releases/download/hybridse-v0.3.0-alpha-0817/hybridse-0.3.0-alpha-0817-linux-x86_64.tar.gz elif [[ $ARCH = 'aarch64' ]]; then - curl -SLo thirdparty.tar.gz https://github.com/jingchen2222/hybridsql-asserts/releases/download/v0.4.0/thirdparty-2021-08-03-linux-gnu-aarch64.tar.gz - curl -SLo libzetasql.tar.gz https://github.com/aceforeverd/zetasql/releases/download/v0.2.1-beta5/libzetasql-0.2.1-beta5-linux-gnu-aarch64.tar.gz # NOTE: missing hybridse-aarch64 + echo "missing hybridse-aarch64" fi + fi + mkdir -p "$THIRDPARTY_PATH/hybridse" + tar xzf hybridse.tar.gz -C "${THIRDPARTY_PATH}/hybridse" --strip-components 1 + popd fi +cd ${ROOT} -curl -SLo thirdsrc.tar.gz https://github.com/jingchen2222/hybridsql-asserts/releases/download/v0.4.0/thirdsrc-2021-08-03.tar.gz - -tar xzf thirdparty.tar.gz -C "$THIRDPARTY_PATH" --strip-components 1 -tar xzf libzetasql.tar.gz -C "$THIRDPARTY_PATH" --strip-components 1 -tar xzf hybridse.tar.gz -C "$THIRDPARTY_PATH/hybridse" --strip-components 1 -tar xzf thirdsrc.tar.gz -C "$THIRDSRC_PATH" --strip-components 1 - -popd - -popd diff --git a/hybridse/tools/setup_thirdparty.sh b/steps/setup_thirdparty.sh similarity index 99% rename from hybridse/tools/setup_thirdparty.sh rename to steps/setup_thirdparty.sh index d52b99c97ef..b8100cb60fc 100755 --- a/hybridse/tools/setup_thirdparty.sh +++ b/steps/setup_thirdparty.sh @@ -54,5 +54,4 @@ fi tar xzf thirdparty.tar.gz -C "${THIRDPARTY_PATH}" --strip-components 1 tar xzf libzetasql.tar.gz -C "${THIRDPARTY_PATH}" --strip-components 1 popd - popd