Skip to content

Commit

Permalink
nit
Browse files Browse the repository at this point in the history
  • Loading branch information
pan3793 committed Sep 7, 2022
1 parent bb85d2b commit 742bdbe
Showing 1 changed file with 41 additions and 40 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -180,46 +180,45 @@ class KyuubiOperationPerUserSuite
}

test("support to interrupt the thrift request if remote engine is broken") {
if (!httpMode) {
withSessionConf(Map(
KyuubiConf.ENGINE_ALIVE_PROBE_ENABLED.key -> "true",
KyuubiConf.ENGINE_ALIVE_PROBE_INTERVAL.key -> "1000",
KyuubiConf.ENGINE_ALIVE_TIMEOUT.key -> "1000"))(Map.empty)(
Map.empty) {
withSessionHandle { (client, handle) =>
val preReq = new TExecuteStatementReq()
preReq.setStatement("select engine_name()")
preReq.setSessionHandle(handle)
preReq.setRunAsync(false)
client.ExecuteStatement(preReq)

val sessionHandle = SessionHandle(handle)
val session = server.backendService.sessionManager.asInstanceOf[KyuubiSessionManager]
.getSession(sessionHandle).asInstanceOf[KyuubiSessionImpl]
session.client.getEngineAliveProbeProtocol.foreach(_.getTransport.close())

val exitReq = new TExecuteStatementReq()
exitReq.setStatement("SELECT java_method('java.lang.Thread', 'sleep', 1000L)," +
"java_method('java.lang.System', 'exit', 1)")
exitReq.setSessionHandle(handle)
exitReq.setRunAsync(true)
client.ExecuteStatement(exitReq)

val executeStmtReq = new TExecuteStatementReq()
executeStmtReq.setStatement("SELECT java_method('java.lang.Thread', 'sleep', 30000l)")
executeStmtReq.setSessionHandle(handle)
executeStmtReq.setRunAsync(false)
val startTime = System.currentTimeMillis()
val executeStmtResp = client.ExecuteStatement(executeStmtReq)
assert(executeStmtResp.getStatus.getStatusCode === TStatusCode.ERROR_STATUS)
assert(executeStmtResp.getStatus.getErrorMessage.contains(
"java.net.SocketException: Connection reset") ||
executeStmtResp.getStatus.getErrorMessage.contains(
"Caused by: java.net.SocketException: Broken pipe (Write failed)"))
val elapsedTime = System.currentTimeMillis() - startTime
assert(elapsedTime < 20 * 1000)
assert(session.client.asyncRequestInterrupted)
}
assume(!httpMode)
withSessionConf(Map(
KyuubiConf.ENGINE_ALIVE_PROBE_ENABLED.key -> "true",
KyuubiConf.ENGINE_ALIVE_PROBE_INTERVAL.key -> "1000",
KyuubiConf.ENGINE_ALIVE_TIMEOUT.key -> "1000"))(Map.empty)(
Map.empty) {
withSessionHandle { (client, handle) =>
val preReq = new TExecuteStatementReq()
preReq.setStatement("select engine_name()")
preReq.setSessionHandle(handle)
preReq.setRunAsync(false)
client.ExecuteStatement(preReq)

val sessionHandle = SessionHandle(handle)
val session = server.backendService.sessionManager.asInstanceOf[KyuubiSessionManager]
.getSession(sessionHandle).asInstanceOf[KyuubiSessionImpl]
session.client.getEngineAliveProbeProtocol.foreach(_.getTransport.close())

val exitReq = new TExecuteStatementReq()
exitReq.setStatement("SELECT java_method('java.lang.Thread', 'sleep', 1000L)," +
"java_method('java.lang.System', 'exit', 1)")
exitReq.setSessionHandle(handle)
exitReq.setRunAsync(true)
client.ExecuteStatement(exitReq)

val executeStmtReq = new TExecuteStatementReq()
executeStmtReq.setStatement("SELECT java_method('java.lang.Thread', 'sleep', 30000l)")
executeStmtReq.setSessionHandle(handle)
executeStmtReq.setRunAsync(false)
val startTime = System.currentTimeMillis()
val executeStmtResp = client.ExecuteStatement(executeStmtReq)
assert(executeStmtResp.getStatus.getStatusCode === TStatusCode.ERROR_STATUS)
assert(executeStmtResp.getStatus.getErrorMessage.contains(
"java.net.SocketException: Connection reset") ||
executeStmtResp.getStatus.getErrorMessage.contains(
"Caused by: java.net.SocketException: Broken pipe (Write failed)"))
val elapsedTime = System.currentTimeMillis() - startTime
assert(elapsedTime < 20 * 1000)
assert(session.client.asyncRequestInterrupted)
}
}
}
Expand Down Expand Up @@ -257,6 +256,7 @@ class KyuubiOperationPerUserSuite
}

test("server info provider - server") {
assume(!httpMode)
withSessionConf(Map(KyuubiConf.SERVER_INFO_PROVIDER.key -> "SERVER"))()() {
withSessionHandle { (client, handle) =>
val req = new TGetInfoReq()
Expand All @@ -268,6 +268,7 @@ class KyuubiOperationPerUserSuite
}

test("server info provider - engine") {
assume(!httpMode)
withSessionConf(Map(KyuubiConf.SERVER_INFO_PROVIDER.key -> "ENGINE"))()() {
withSessionHandle { (client, handle) =>
val req = new TGetInfoReq()
Expand Down

0 comments on commit 742bdbe

Please sign in to comment.