From fbba591952b4f252fcef4ba80aacbd77a5c4b539 Mon Sep 17 00:00:00 2001 From: tobe Date: Wed, 1 Mar 2023 14:58:44 +0800 Subject: [PATCH 1/2] Get job fail status for show job commands --- src/sdk/sql_cluster_router.cc | 51 ++++++++++++++++++++++++++--------- src/sdk/sql_cluster_router.h | 2 +- 2 files changed, 40 insertions(+), 13 deletions(-) diff --git a/src/sdk/sql_cluster_router.cc b/src/sdk/sql_cluster_router.cc index 313a964bfbe..95cbcd54f47 100644 --- a/src/sdk/sql_cluster_router.cc +++ b/src/sdk/sql_cluster_router.cc @@ -1837,7 +1837,13 @@ std::shared_ptr SQLClusterRouter::HandleSQLCmd(const h *status = {StatusCode::kCmdError, "Failed to parse job id: " + cmd_node->GetArgs()[0]}; return {}; } - return this->GetJobResultSet(job_id); + + auto rs = this->GetJobResultSet(job_id, status); + if (status->IsOK()) { + return rs; + } else { + return {}; + } } case hybridse::node::kCmdShowJobLog: { int job_id; @@ -1873,7 +1879,13 @@ std::shared_ptr SQLClusterRouter::HandleSQLCmd(const h ::openmldb::taskmanager::JobInfo job_info; StopJob(job_id, &job_info); - return this->GetJobResultSet(job_id); + + auto rs = this->GetJobResultSet(job_id, status); + if (status->IsOK()) { + return rs; + } else { + return {}; + } } case hybridse::node::kCmdDropTable: { *status = {}; @@ -2563,7 +2575,12 @@ std::shared_ptr SQLClusterRouter::ExecuteSQL( ReadSparkConfFromFile(std::dynamic_pointer_cast(options_)->spark_conf_path, &config); auto base_status = ExportOfflineData(sql, config, db, is_sync_job, offline_job_timeout, &job_info); if (base_status.OK()) { - return this->GetJobResultSet(job_info.id()); + auto rs = this->GetJobResultSet(job_info.id(), status); + if (status->IsOK()) { + return rs; + } else { + return {}; + } } else { *status = {StatusCode::kCmdError, base_status.msg}; } @@ -2617,7 +2634,12 @@ std::shared_ptr SQLClusterRouter::ExecuteSQL( base_status = ImportOfflineData(sql, config, database, is_sync_job, offline_job_timeout, &job_info); } if (base_status.OK() && job_info.id() > 0) { - return this->GetJobResultSet(job_info.id()); + auto rs = this->GetJobResultSet(job_info.id(), status); + if (status->IsOK()) { + return rs; + } else { + return {}; + } } else { APPEND_FROM_BASE_AND_WARN(status, base_status, "taskmanager load data failed"); } @@ -2675,7 +2697,13 @@ std::shared_ptr SQLClusterRouter::ExecuteOfflineQuery( APPEND_FROM_BASE_AND_WARN(status, base_status, "async offline query failed"); return {}; } - return this->GetJobResultSet(job_info.id()); + + auto rs = this->GetJobResultSet(job_info.id(), status); + if (status->IsOK()) { + return rs; + } else { + return {}; + } } } @@ -4191,19 +4219,18 @@ void SQLClusterRouter::ReadSparkConfFromFile(std::string conf_file_path, std::ma } } -std::shared_ptr SQLClusterRouter::GetJobResultSet(int job_id) { - hybridse::sdk::Status status; - +std::shared_ptr SQLClusterRouter::GetJobResultSet(int job_id, + ::hybridse::sdk::Status* status) { std::string db = openmldb::nameserver::INTERNAL_DB; std::string sql = "SELECT * FROM JOB_INFO WHERE id = " + std::to_string(job_id); - auto rs = ExecuteSQLParameterized(db, sql, {}, &status); - if (!status.IsOK()) { + auto rs = ExecuteSQLParameterized(db, sql, {}, status); + if (!status->IsOK()) { return {}; } if (rs->Size() == 0) { - status.code = ::hybridse::common::StatusCode::kCmdError; - status.msg = "Job not found: " + std::to_string(job_id); + status->SetCode(::hybridse::common::StatusCode::kCmdError); + status->SetMsg("Job not found: " + std::to_string(job_id)); return {}; } if (FLAGS_role == "sql_client") { diff --git a/src/sdk/sql_cluster_router.h b/src/sdk/sql_cluster_router.h index 3df7cf9030a..fa44fa1c31b 100644 --- a/src/sdk/sql_cluster_router.h +++ b/src/sdk/sql_cluster_router.h @@ -367,7 +367,7 @@ class SQLClusterRouter : public SQLRouter { const std::string& pattern, hybridse::sdk::Status* status); - std::shared_ptr GetJobResultSet(int job_id); + std::shared_ptr GetJobResultSet(int job_id, ::hybridse::sdk::Status* status); bool CheckTableStatus(const std::string& db, const std::string& table_name, uint32_t tid, const nameserver::TablePartition& partition_info, uint32_t replica_num, From 8ad9f1782d1173a7cc0760260f92adab03008712 Mon Sep 17 00:00:00 2001 From: tobe Date: Thu, 2 Mar 2023 16:29:31 +0800 Subject: [PATCH 2/2] Do not check result set for GetJobResultSet --- src/sdk/sql_cluster_router.cc | 35 +++++------------------------------ 1 file changed, 5 insertions(+), 30 deletions(-) diff --git a/src/sdk/sql_cluster_router.cc b/src/sdk/sql_cluster_router.cc index 95cbcd54f47..ece34d12c7a 100644 --- a/src/sdk/sql_cluster_router.cc +++ b/src/sdk/sql_cluster_router.cc @@ -1838,12 +1838,7 @@ std::shared_ptr SQLClusterRouter::HandleSQLCmd(const h return {}; } - auto rs = this->GetJobResultSet(job_id, status); - if (status->IsOK()) { - return rs; - } else { - return {}; - } + return this->GetJobResultSet(job_id, status); } case hybridse::node::kCmdShowJobLog: { int job_id; @@ -1880,12 +1875,7 @@ std::shared_ptr SQLClusterRouter::HandleSQLCmd(const h ::openmldb::taskmanager::JobInfo job_info; StopJob(job_id, &job_info); - auto rs = this->GetJobResultSet(job_id, status); - if (status->IsOK()) { - return rs; - } else { - return {}; - } + return this->GetJobResultSet(job_id, status); } case hybridse::node::kCmdDropTable: { *status = {}; @@ -2575,12 +2565,7 @@ std::shared_ptr SQLClusterRouter::ExecuteSQL( ReadSparkConfFromFile(std::dynamic_pointer_cast(options_)->spark_conf_path, &config); auto base_status = ExportOfflineData(sql, config, db, is_sync_job, offline_job_timeout, &job_info); if (base_status.OK()) { - auto rs = this->GetJobResultSet(job_info.id(), status); - if (status->IsOK()) { - return rs; - } else { - return {}; - } + return this->GetJobResultSet(job_info.id(), status); } else { *status = {StatusCode::kCmdError, base_status.msg}; } @@ -2634,12 +2619,7 @@ std::shared_ptr SQLClusterRouter::ExecuteSQL( base_status = ImportOfflineData(sql, config, database, is_sync_job, offline_job_timeout, &job_info); } if (base_status.OK() && job_info.id() > 0) { - auto rs = this->GetJobResultSet(job_info.id(), status); - if (status->IsOK()) { - return rs; - } else { - return {}; - } + return this->GetJobResultSet(job_info.id(), status); } else { APPEND_FROM_BASE_AND_WARN(status, base_status, "taskmanager load data failed"); } @@ -2698,12 +2678,7 @@ std::shared_ptr SQLClusterRouter::ExecuteOfflineQuery( return {}; } - auto rs = this->GetJobResultSet(job_info.id(), status); - if (status->IsOK()) { - return rs; - } else { - return {}; - } + return this->GetJobResultSet(job_info.id(), status); } }