Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: refine openmldb batch log and offline job info with SQL #3640

Merged
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,8 @@
* @return
*/
def openmldbSql(sqlText: String): OpenmldbDataframe = {
logger.info("Try to execute OpenMLDB SQL: " + sqlText)

if (config.enableSparksql) {
return OpenmldbDataframe(this, sparksql(sqlText))
}
Expand Down Expand Up @@ -278,7 +280,15 @@
def close(): Unit = stop()

def registerOpenmldbOfflineTable(catalogService: OpenmldbCatalogService): Unit = {
if (catalogService == null) {
return

Check warning on line 284 in java/openmldb-batch/src/main/scala/com/_4paradigm/openmldb/batch/api/OpenmldbSession.scala

View check run for this annotation

Codecov / codecov/patch

java/openmldb-batch/src/main/scala/com/_4paradigm/openmldb/batch/api/OpenmldbSession.scala#L284

Added line #L284 was not covered by tests
}

val databases = catalogService.getDatabases
if (databases == null) {
return

Check warning on line 289 in java/openmldb-batch/src/main/scala/com/_4paradigm/openmldb/batch/api/OpenmldbSession.scala

View check run for this annotation

Codecov / codecov/patch

java/openmldb-batch/src/main/scala/com/_4paradigm/openmldb/batch/api/OpenmldbSession.scala#L289

Added line #L289 was not covered by tests
}

databases.map(dbName => {
val tableInfos = catalogService.getTableInfos(dbName)
tableInfos.map(tableInfo => {
Expand Down Expand Up @@ -323,7 +333,7 @@
}
} catch {
case e: Exception => {
logger.warn(s"Fail to register table $dbName.$tableName " + ExceptionUtils.getStackTrace(e))
logger.warn(s"Fail to register table $dbName.$tableName, exception: " + ExceptionUtils.getStackTrace(e))
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,10 +57,10 @@

if (TaskManagerConfig.isK8s) {
val args = List(sql)
K8sJobManager.submitSparkJob(jobType, mainClass, args, tempSqlFile.getAbsolutePath, sparkConf.asScala.toMap,
K8sJobManager.submitSparkJob(jobType, mainClass, args, sql, tempSqlFile.getAbsolutePath, sparkConf.asScala.toMap,

Check warning on line 60 in java/openmldb-taskmanager/src/main/scala/com/_4paradigm/openmldb/taskmanager/OpenmldbBatchjobManager.scala

View check run for this annotation

Codecov / codecov/patch

java/openmldb-taskmanager/src/main/scala/com/_4paradigm/openmldb/taskmanager/OpenmldbBatchjobManager.scala#L60

Added line #L60 was not covered by tests
defaultDb)
} else {
SparkJobManager.submitSparkJob(jobType, mainClass, args, tempSqlFile.getAbsolutePath,
SparkJobManager.submitSparkJob(jobType, mainClass, args, sql, tempSqlFile.getAbsolutePath,

Check warning on line 63 in java/openmldb-taskmanager/src/main/scala/com/_4paradigm/openmldb/taskmanager/OpenmldbBatchjobManager.scala

View check run for this annotation

Codecov / codecov/patch

java/openmldb-taskmanager/src/main/scala/com/_4paradigm/openmldb/taskmanager/OpenmldbBatchjobManager.scala#L63

Added line #L63 was not covered by tests
sparkConf.asScala.toMap, defaultDb, blocking = true)
}
}
Expand All @@ -73,11 +73,11 @@

if (TaskManagerConfig.isK8s) {
val args = List(sql)
K8sJobManager.submitSparkJob(jobType, mainClass, args, tempSqlFile.getAbsolutePath, sparkConf.asScala.toMap,
K8sJobManager.submitSparkJob(jobType, mainClass, args, sql, tempSqlFile.getAbsolutePath, sparkConf.asScala.toMap,

Check warning on line 76 in java/openmldb-taskmanager/src/main/scala/com/_4paradigm/openmldb/taskmanager/OpenmldbBatchjobManager.scala

View check run for this annotation

Codecov / codecov/patch

java/openmldb-taskmanager/src/main/scala/com/_4paradigm/openmldb/taskmanager/OpenmldbBatchjobManager.scala#L76

Added line #L76 was not covered by tests
defaultDb)
} else {
val args = List(tempSqlFile.getAbsolutePath)
SparkJobManager.submitSparkJob(jobType, mainClass, args, tempSqlFile.getAbsolutePath, sparkConf.asScala.toMap,
SparkJobManager.submitSparkJob(jobType, mainClass, args, sql, tempSqlFile.getAbsolutePath, sparkConf.asScala.toMap,

Check warning on line 80 in java/openmldb-taskmanager/src/main/scala/com/_4paradigm/openmldb/taskmanager/OpenmldbBatchjobManager.scala

View check run for this annotation

Codecov / codecov/patch

java/openmldb-taskmanager/src/main/scala/com/_4paradigm/openmldb/taskmanager/OpenmldbBatchjobManager.scala#L80

Added line #L80 was not covered by tests
defaultDb)
}
}
Expand All @@ -90,11 +90,11 @@

if (TaskManagerConfig.isK8s) {
val args = List(sql)
K8sJobManager.submitSparkJob(jobType, mainClass, args, tempSqlFile.getAbsolutePath, sparkConf.asScala.toMap,
K8sJobManager.submitSparkJob(jobType, mainClass, args, sql, tempSqlFile.getAbsolutePath, sparkConf.asScala.toMap,

Check warning on line 93 in java/openmldb-taskmanager/src/main/scala/com/_4paradigm/openmldb/taskmanager/OpenmldbBatchjobManager.scala

View check run for this annotation

Codecov / codecov/patch

java/openmldb-taskmanager/src/main/scala/com/_4paradigm/openmldb/taskmanager/OpenmldbBatchjobManager.scala#L93

Added line #L93 was not covered by tests
defaultDb)
} else {
val args = List(tempSqlFile.getAbsolutePath)
SparkJobManager.submitSparkJob(jobType, mainClass, args, tempSqlFile.getAbsolutePath, sparkConf.asScala.toMap,
SparkJobManager.submitSparkJob(jobType, mainClass, args, sql, tempSqlFile.getAbsolutePath, sparkConf.asScala.toMap,

Check warning on line 97 in java/openmldb-taskmanager/src/main/scala/com/_4paradigm/openmldb/taskmanager/OpenmldbBatchjobManager.scala

View check run for this annotation

Codecov / codecov/patch

java/openmldb-taskmanager/src/main/scala/com/_4paradigm/openmldb/taskmanager/OpenmldbBatchjobManager.scala#L97

Added line #L97 was not covered by tests
defaultDb)
}
}
Expand All @@ -107,11 +107,11 @@

if (TaskManagerConfig.isK8s) {
val args = List(sql)
K8sJobManager.submitSparkJob(jobType, mainClass, args, tempSqlFile.getAbsolutePath, sparkConf.asScala.toMap,
K8sJobManager.submitSparkJob(jobType, mainClass, args, sql, tempSqlFile.getAbsolutePath, sparkConf.asScala.toMap,

Check warning on line 110 in java/openmldb-taskmanager/src/main/scala/com/_4paradigm/openmldb/taskmanager/OpenmldbBatchjobManager.scala

View check run for this annotation

Codecov / codecov/patch

java/openmldb-taskmanager/src/main/scala/com/_4paradigm/openmldb/taskmanager/OpenmldbBatchjobManager.scala#L110

Added line #L110 was not covered by tests
defaultDb)
} else {
val args = List(tempSqlFile.getAbsolutePath)
SparkJobManager.submitSparkJob(jobType, mainClass, args, tempSqlFile.getAbsolutePath, sparkConf.asScala.toMap,
SparkJobManager.submitSparkJob(jobType, mainClass, args, sql, tempSqlFile.getAbsolutePath, sparkConf.asScala.toMap,

Check warning on line 114 in java/openmldb-taskmanager/src/main/scala/com/_4paradigm/openmldb/taskmanager/OpenmldbBatchjobManager.scala

View check run for this annotation

Codecov / codecov/patch

java/openmldb-taskmanager/src/main/scala/com/_4paradigm/openmldb/taskmanager/OpenmldbBatchjobManager.scala#L114

Added line #L114 was not covered by tests
defaultDb)
}
}
Expand All @@ -124,11 +124,11 @@

if (TaskManagerConfig.isK8s) {
val args = List(sql)
K8sJobManager.submitSparkJob(jobType, mainClass, args, tempSqlFile.getAbsolutePath, sparkConf.asScala.toMap,
K8sJobManager.submitSparkJob(jobType, mainClass, args, sql, tempSqlFile.getAbsolutePath, sparkConf.asScala.toMap,

Check warning on line 127 in java/openmldb-taskmanager/src/main/scala/com/_4paradigm/openmldb/taskmanager/OpenmldbBatchjobManager.scala

View check run for this annotation

Codecov / codecov/patch

java/openmldb-taskmanager/src/main/scala/com/_4paradigm/openmldb/taskmanager/OpenmldbBatchjobManager.scala#L127

Added line #L127 was not covered by tests
defaultDb)
} else {
val args = List(tempSqlFile.getAbsolutePath)
SparkJobManager.submitSparkJob(jobType, mainClass, args, tempSqlFile.getAbsolutePath, sparkConf.asScala.toMap,
SparkJobManager.submitSparkJob(jobType, mainClass, args, sql, tempSqlFile.getAbsolutePath, sparkConf.asScala.toMap,

Check warning on line 131 in java/openmldb-taskmanager/src/main/scala/com/_4paradigm/openmldb/taskmanager/OpenmldbBatchjobManager.scala

View check run for this annotation

Codecov / codecov/patch

java/openmldb-taskmanager/src/main/scala/com/_4paradigm/openmldb/taskmanager/OpenmldbBatchjobManager.scala#L131

Added line #L131 was not covered by tests
defaultDb)
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,12 +44,18 @@

def submitSparkJob(jobType: String, mainClass: String,
args: List[String] = List(),
sql: String = "",

Check warning on line 47 in java/openmldb-taskmanager/src/main/scala/com/_4paradigm/openmldb/taskmanager/k8s/K8sJobManager.scala

View check run for this annotation

Codecov / codecov/patch

java/openmldb-taskmanager/src/main/scala/com/_4paradigm/openmldb/taskmanager/k8s/K8sJobManager.scala#L47

Added line #L47 was not covered by tests
localSqlFile: String = "",
sparkConf: Map[String, String] = Map(),
defaultDb: String = "",
blocking: Boolean = false): JobInfo = {

val jobInfo = JobInfoManager.createJobInfo(jobType, args, sparkConf)
val jobInfoArgs = if (sql.nonEmpty) {
List(sql)

Check warning on line 54 in java/openmldb-taskmanager/src/main/scala/com/_4paradigm/openmldb/taskmanager/k8s/K8sJobManager.scala

View check run for this annotation

Codecov / codecov/patch

java/openmldb-taskmanager/src/main/scala/com/_4paradigm/openmldb/taskmanager/k8s/K8sJobManager.scala#L54

Added line #L54 was not covered by tests
} else {
args

Check warning on line 56 in java/openmldb-taskmanager/src/main/scala/com/_4paradigm/openmldb/taskmanager/k8s/K8sJobManager.scala

View check run for this annotation

Codecov / codecov/patch

java/openmldb-taskmanager/src/main/scala/com/_4paradigm/openmldb/taskmanager/k8s/K8sJobManager.scala#L56

Added line #L56 was not covered by tests
}
val jobInfo = JobInfoManager.createJobInfo(jobType, jobInfoArgs, sparkConf)

Check warning on line 58 in java/openmldb-taskmanager/src/main/scala/com/_4paradigm/openmldb/taskmanager/k8s/K8sJobManager.scala

View check run for this annotation

Codecov / codecov/patch

java/openmldb-taskmanager/src/main/scala/com/_4paradigm/openmldb/taskmanager/k8s/K8sJobManager.scala#L58

Added line #L58 was not covered by tests

val jobName = getK8sJobName(jobInfo.getId)
jobInfo.setApplicationId(jobName)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,11 +77,18 @@

def submitSparkJob(jobType: String, mainClass: String,
args: List[String] = List(),
sql: String = "",

Check warning on line 80 in java/openmldb-taskmanager/src/main/scala/com/_4paradigm/openmldb/taskmanager/spark/SparkJobManager.scala

View check run for this annotation

Codecov / codecov/patch

java/openmldb-taskmanager/src/main/scala/com/_4paradigm/openmldb/taskmanager/spark/SparkJobManager.scala#L80

Added line #L80 was not covered by tests
localSqlFile: String = "",
sparkConf: Map[String, String] = Map(),
defaultDb: String = "",
blocking: Boolean = false): JobInfo = {
val jobInfo = JobInfoManager.createJobInfo(jobType, args, sparkConf)

val jobInfoArgs = if (sql.nonEmpty) {
List(sql)

Check warning on line 87 in java/openmldb-taskmanager/src/main/scala/com/_4paradigm/openmldb/taskmanager/spark/SparkJobManager.scala

View check run for this annotation

Codecov / codecov/patch

java/openmldb-taskmanager/src/main/scala/com/_4paradigm/openmldb/taskmanager/spark/SparkJobManager.scala#L87

Added line #L87 was not covered by tests
} else {
args

Check warning on line 89 in java/openmldb-taskmanager/src/main/scala/com/_4paradigm/openmldb/taskmanager/spark/SparkJobManager.scala

View check run for this annotation

Codecov / codecov/patch

java/openmldb-taskmanager/src/main/scala/com/_4paradigm/openmldb/taskmanager/spark/SparkJobManager.scala#L89

Added line #L89 was not covered by tests
}
val jobInfo = JobInfoManager.createJobInfo(jobType, jobInfoArgs, sparkConf)

Check warning on line 91 in java/openmldb-taskmanager/src/main/scala/com/_4paradigm/openmldb/taskmanager/spark/SparkJobManager.scala

View check run for this annotation

Codecov / codecov/patch

java/openmldb-taskmanager/src/main/scala/com/_4paradigm/openmldb/taskmanager/spark/SparkJobManager.scala#L91

Added line #L91 was not covered by tests

// Submit Spark application with SparkLauncher
val launcher = createSparkLauncher(mainClass)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ class TestSparkJobManager extends FunSuite {
val jobType = "DummySparkApp"
val sparkConf = Map(SparkLauncher.DRIVER_EXTRA_CLASSPATH -> System.getProperty("java.class.path"))

SparkJobManager.submitSparkJob(jobType, mainClass, List[String](), "", sparkConf)
SparkJobManager.submitSparkJob(jobType, mainClass, List[String](), "", "", sparkConf)

JobInfoManager.getAllJobs().map(println)
Thread.sleep(5000)
Expand Down
Loading