Skip to content

Commit

Permalink
feat: get job fail status for show job commands (#3112)
Browse files Browse the repository at this point in the history
* Get job fail status for show job commands

* Do not check result set for GetJobResultSet
  • Loading branch information
tobegit3hub authored Mar 3, 2023
1 parent 034f8dd commit df89712
Show file tree
Hide file tree
Showing 2 changed files with 15 additions and 13 deletions.
26 changes: 14 additions & 12 deletions src/sdk/sql_cluster_router.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1845,7 +1845,8 @@ std::shared_ptr<hybridse::sdk::ResultSet> SQLClusterRouter::HandleSQLCmd(const h
*status = {StatusCode::kCmdError, "Failed to parse job id: " + cmd_node->GetArgs()[0]};
return {};
}
return this->GetJobResultSet(job_id);

return this->GetJobResultSet(job_id, status);
}
case hybridse::node::kCmdShowJobLog: {
int job_id;
Expand Down Expand Up @@ -1881,7 +1882,8 @@ std::shared_ptr<hybridse::sdk::ResultSet> SQLClusterRouter::HandleSQLCmd(const h

::openmldb::taskmanager::JobInfo job_info;
StopJob(job_id, &job_info);
return this->GetJobResultSet(job_id);

return this->GetJobResultSet(job_id, status);
}
case hybridse::node::kCmdDropTable: {
*status = {};
Expand Down Expand Up @@ -2571,7 +2573,7 @@ std::shared_ptr<hybridse::sdk::ResultSet> SQLClusterRouter::ExecuteSQL(
ReadSparkConfFromFile(std::dynamic_pointer_cast<SQLRouterOptions>(options_)->spark_conf_path, &config);
auto base_status = ExportOfflineData(sql, config, db, is_sync_job, offline_job_timeout, &job_info);
if (base_status.OK()) {
return this->GetJobResultSet(job_info.id());
return this->GetJobResultSet(job_info.id(), status);
} else {
*status = {StatusCode::kCmdError, base_status.msg};
}
Expand Down Expand Up @@ -2625,7 +2627,7 @@ std::shared_ptr<hybridse::sdk::ResultSet> SQLClusterRouter::ExecuteSQL(
base_status = ImportOfflineData(sql, config, database, is_sync_job, offline_job_timeout, &job_info);
}
if (base_status.OK() && job_info.id() > 0) {
return this->GetJobResultSet(job_info.id());
return this->GetJobResultSet(job_info.id(), status);
} else {
APPEND_FROM_BASE_AND_WARN(status, base_status, "taskmanager load data failed");
}
Expand Down Expand Up @@ -2683,7 +2685,8 @@ std::shared_ptr<hybridse::sdk::ResultSet> SQLClusterRouter::ExecuteOfflineQuery(
APPEND_FROM_BASE_AND_WARN(status, base_status, "async offline query failed");
return {};
}
return this->GetJobResultSet(job_info.id());

return this->GetJobResultSet(job_info.id(), status);
}
}

Expand Down Expand Up @@ -4199,19 +4202,18 @@ void SQLClusterRouter::ReadSparkConfFromFile(std::string conf_file_path, std::ma
}
}

std::shared_ptr<hybridse::sdk::ResultSet> SQLClusterRouter::GetJobResultSet(int job_id) {
hybridse::sdk::Status status;

std::shared_ptr<hybridse::sdk::ResultSet> SQLClusterRouter::GetJobResultSet(int job_id,
::hybridse::sdk::Status* status) {
std::string db = openmldb::nameserver::INTERNAL_DB;
std::string sql = "SELECT * FROM JOB_INFO WHERE id = " + std::to_string(job_id);

auto rs = ExecuteSQLParameterized(db, sql, {}, &status);
if (!status.IsOK()) {
auto rs = ExecuteSQLParameterized(db, sql, {}, status);
if (!status->IsOK()) {
return {};
}
if (rs->Size() == 0) {
status.code = ::hybridse::common::StatusCode::kCmdError;
status.msg = "Job not found: " + std::to_string(job_id);
status->SetCode(::hybridse::common::StatusCode::kCmdError);
status->SetMsg("Job not found: " + std::to_string(job_id));
return {};
}
if (FLAGS_role == "sql_client") {
Expand Down
2 changes: 1 addition & 1 deletion src/sdk/sql_cluster_router.h
Original file line number Diff line number Diff line change
Expand Up @@ -367,7 +367,7 @@ class SQLClusterRouter : public SQLRouter {
const std::string& pattern,
hybridse::sdk::Status* status);

std::shared_ptr<hybridse::sdk::ResultSet> GetJobResultSet(int job_id);
std::shared_ptr<hybridse::sdk::ResultSet> GetJobResultSet(int job_id, ::hybridse::sdk::Status* status);

bool CheckTableStatus(const std::string& db, const std::string& table_name, uint32_t tid,
const nameserver::TablePartition& partition_info, uint32_t replica_num,
Expand Down

0 comments on commit df89712

Please sign in to comment.