Skip to content

Commit

Permalink
[KYUUBI #5861] Generalize TRowSet generator for JDBC engine with dial…
Browse files Browse the repository at this point in the history
…ects

# 🔍 Description
## Issue References 🔗

As described.

## Describe Your Solution 🔧

- Introduced JdbcTRowSetGenerator extending `AbstractTRowSetGenerator ` introduced in #5851 in JDBC engine.
- Provide a DefaultJdbcTRowSetGenerator as default implementation for mapping the JDBC data types to TRowSet generation
- Make JDBC dialect providing TRowSetGenerator extending DefaultJdbcTRowSetGenerator to adapt detailed differences

## Types of changes 🔖

- [ ] Bugfix (non-breaking change which fixes an issue)
- [ ] New feature (non-breaking change which adds functionality)
- [ ] Breaking change (fix or feature that would cause existing functionality to change)

## Test Plan 🧪

#### Behavior Without This Pull Request ⚰️

#### Behavior With This Pull Request 🎉

#### Related Unit Tests

---

# Checklists
## 📝 Author Self Checklist

- [x] My code follows the [style guidelines](https://kyuubi.readthedocs.io/en/master/contributing/code/style.html) of this project
- [x] I have performed a self-review
- [x] I have commented my code, particularly in hard-to-understand areas
- [x] I have made corresponding changes to the documentation
- [ ] My changes generate no new warnings
- [ ] I have added tests that prove my fix is effective or that my feature works
- [x] New and existing unit tests pass locally with my changes
- [x] This patch was not authored or co-authored using [Generative Tooling](https://www.apache.org/legal/generative-tooling.html)

## 📝 Committer Pre-Merge Checklist

- [ ] Pull request title is okay.
- [ ] No license issues.
- [ ] Milestone correctly set?
- [ ] Test coverage is ok
- [ ] Assignees are selected.
- [ ] Minimum number of approvals
- [ ] No changes are requested

**Be nice. Be informative.**

Closes #5861 from bowenliang123/jdbc-rowgen.

Closes #5861

7f8658d [Bowen Liang] generalize jdbc TRowSet generator

Authored-by: Bowen Liang <[email protected]>
Signed-off-by: liangbowen <[email protected]>
  • Loading branch information
bowenliang123 committed Dec 18, 2023
1 parent acdd74d commit 09febce
Show file tree
Hide file tree
Showing 13 changed files with 211 additions and 386 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,8 @@ import scala.collection.mutable.ArrayBuffer

import org.apache.commons.lang3.StringUtils

import org.apache.kyuubi.engine.jdbc.doris.{DorisRowSetHelper, DorisSchemaHelper}
import org.apache.kyuubi.engine.jdbc.schema.{RowSetHelper, SchemaHelper}
import org.apache.kyuubi.engine.jdbc.doris.{DorisSchemaHelper, DorisTRowSetGenerator}
import org.apache.kyuubi.engine.jdbc.schema.{JdbcTRowSetGenerator, SchemaHelper}
import org.apache.kyuubi.operation.meta.ResultSetSchemaConstant._
import org.apache.kyuubi.session.Session

Expand Down Expand Up @@ -120,9 +120,7 @@ class DorisDialect extends JdbcDialect {
query.toString()
}

override def getRowSetHelper(): RowSetHelper = {
new DorisRowSetHelper
}
override def getTRowSetGenerator(): JdbcTRowSetGenerator = new DorisTRowSetGenerator

override def getSchemaHelper(): SchemaHelper = {
new DorisSchemaHelper
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import java.util
import org.apache.kyuubi.{KyuubiException, KyuubiSQLException, Logging}
import org.apache.kyuubi.config.KyuubiConf
import org.apache.kyuubi.config.KyuubiConf.{ENGINE_JDBC_CONNECTION_URL, ENGINE_JDBC_SHORT_NAME}
import org.apache.kyuubi.engine.jdbc.schema.{RowSetHelper, SchemaHelper}
import org.apache.kyuubi.engine.jdbc.schema.{JdbcTRowSetGenerator, SchemaHelper}
import org.apache.kyuubi.engine.jdbc.util.SupportServiceLoader
import org.apache.kyuubi.operation.Operation
import org.apache.kyuubi.session.Session
Expand Down Expand Up @@ -78,7 +78,7 @@ abstract class JdbcDialect extends SupportServiceLoader with Logging {
throw KyuubiSQLException.featureNotSupported()
}

def getRowSetHelper(): RowSetHelper
def getTRowSetGenerator(): JdbcTRowSetGenerator

def getSchemaHelper(): SchemaHelper
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,8 @@ import scala.collection.mutable.ArrayBuffer

import org.apache.commons.lang3.StringUtils

import org.apache.kyuubi.engine.jdbc.mysql.{MySQLRowSetHelper, MySQLSchemaHelper}
import org.apache.kyuubi.engine.jdbc.schema.{RowSetHelper, SchemaHelper}
import org.apache.kyuubi.engine.jdbc.mysql.{MySQLSchemaHelper, MySQLTRowSetGenerator}
import org.apache.kyuubi.engine.jdbc.schema.{JdbcTRowSetGenerator, SchemaHelper}
import org.apache.kyuubi.operation.meta.ResultSetSchemaConstant._
import org.apache.kyuubi.session.Session

Expand Down Expand Up @@ -122,9 +122,7 @@ class MySQLDialect extends JdbcDialect {
query.toString()
}

override def getRowSetHelper(): RowSetHelper = {
new MySQLRowSetHelper
}
override def getTRowSetGenerator(): JdbcTRowSetGenerator = new MySQLTRowSetGenerator

override def getSchemaHelper(): SchemaHelper = {
new MySQLSchemaHelper
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,8 @@ import scala.collection.mutable.ArrayBuffer

import org.apache.commons.lang3.StringUtils

import org.apache.kyuubi.engine.jdbc.phoenix.{PhoenixRowSetHelper, PhoenixSchemaHelper}
import org.apache.kyuubi.engine.jdbc.schema.{RowSetHelper, SchemaHelper}
import org.apache.kyuubi.engine.jdbc.phoenix.PhoenixSchemaHelper
import org.apache.kyuubi.engine.jdbc.schema.{DefaultJdbcTRowSetGenerator, JdbcTRowSetGenerator, SchemaHelper}
import org.apache.kyuubi.operation.meta.ResultSetSchemaConstant._
import org.apache.kyuubi.session.Session

Expand Down Expand Up @@ -100,9 +100,7 @@ class PhoenixDialect extends JdbcDialect {
query.toString()
}

override def getRowSetHelper(): RowSetHelper = {
new PhoenixRowSetHelper
}
override def getTRowSetGenerator(): JdbcTRowSetGenerator = new DefaultJdbcTRowSetGenerator

override def getSchemaHelper(): SchemaHelper = {
new PhoenixSchemaHelper
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,8 @@ import scala.collection.mutable.ArrayBuffer
import org.apache.commons.lang3.StringUtils

import org.apache.kyuubi.KyuubiSQLException
import org.apache.kyuubi.engine.jdbc.postgresql.{PostgreSQLRowSetHelper, PostgreSQLSchemaHelper}
import org.apache.kyuubi.engine.jdbc.schema.{RowSetHelper, SchemaHelper}
import org.apache.kyuubi.engine.jdbc.postgresql.{PostgreSQLSchemaHelper, PostgreSQLTRowSetGenerator}
import org.apache.kyuubi.engine.jdbc.schema.{JdbcTRowSetGenerator, SchemaHelper}
import org.apache.kyuubi.operation.Operation
import org.apache.kyuubi.operation.meta.ResultSetSchemaConstant._
import org.apache.kyuubi.session.Session
Expand Down Expand Up @@ -181,9 +181,7 @@ class PostgreSQLDialect extends JdbcDialect {
throw KyuubiSQLException.featureNotSupported()
}

override def getRowSetHelper(): RowSetHelper = {
new PostgreSQLRowSetHelper
}
override def getTRowSetGenerator(): JdbcTRowSetGenerator = new PostgreSQLTRowSetGenerator

override def getSchemaHelper(): SchemaHelper = {
new PostgreSQLSchemaHelper
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,20 +16,20 @@
*/
package org.apache.kyuubi.engine.jdbc.doris

import org.apache.kyuubi.engine.jdbc.schema.RowSetHelper
import org.apache.kyuubi.shaded.hive.service.rpc.thrift._
import org.apache.kyuubi.engine.jdbc.schema.DefaultJdbcTRowSetGenerator
import org.apache.kyuubi.shaded.hive.service.rpc.thrift.{TColumn, TColumnValue}

class DorisRowSetHelper extends RowSetHelper {
class DorisTRowSetGenerator extends DefaultJdbcTRowSetGenerator {

override def toTinyIntTColumn(rows: Seq[Seq[Any]], ordinal: Int): TColumn =
override def toTinyIntTColumn(rows: Seq[Seq[_]], ordinal: Int): TColumn =
toIntegerTColumn(rows, ordinal)

override def toSmallIntTColumn(rows: Seq[Seq[Any]], ordinal: Int): TColumn =
override def toSmallIntTColumn(rows: Seq[Seq[_]], ordinal: Int): TColumn =
toIntegerTColumn(rows, ordinal)

override def toTinyIntTColumnValue(row: List[Any], ordinal: Int): TColumnValue =
override def toTinyIntTColumnValue(row: Seq[_], ordinal: Int): TColumnValue =
toIntegerTColumnValue(row, ordinal)

override def toSmallIntTColumnValue(row: List[Any], ordinal: Int): TColumnValue =
override def toSmallIntTColumnValue(row: Seq[_], ordinal: Int): TColumnValue =
toIntegerTColumnValue(row, ordinal)
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,49 +18,49 @@ package org.apache.kyuubi.engine.jdbc.mysql

import java.sql.Types

import org.apache.kyuubi.engine.jdbc.schema.RowSetHelper
import org.apache.kyuubi.engine.jdbc.schema.DefaultJdbcTRowSetGenerator
import org.apache.kyuubi.shaded.hive.service.rpc.thrift.{TColumn, TColumnValue}

class MySQLRowSetHelper extends RowSetHelper {
class MySQLTRowSetGenerator extends DefaultJdbcTRowSetGenerator {

override def toTinyIntTColumn(rows: Seq[Seq[Any]], ordinal: Int): TColumn =
override def toTinyIntTColumn(rows: Seq[Seq[_]], ordinal: Int): TColumn =
toIntegerTColumn(rows, ordinal)

override def toSmallIntTColumn(rows: Seq[Seq[Any]], ordinal: Int): TColumn =
override def toSmallIntTColumn(rows: Seq[Seq[_]], ordinal: Int): TColumn =
toIntegerTColumn(rows, ordinal)

override def toTinyIntTColumnValue(row: List[Any], ordinal: Int): TColumnValue =
override def toTinyIntTColumnValue(row: Seq[_], ordinal: Int): TColumnValue =
toIntegerTColumnValue(row, ordinal)

override def toSmallIntTColumnValue(row: List[Any], ordinal: Int): TColumnValue =
override def toSmallIntTColumnValue(row: Seq[_], ordinal: Int): TColumnValue =
toIntegerTColumnValue(row, ordinal)

override protected def toIntegerTColumn(rows: Seq[Seq[Any]], ordinal: Int): TColumn = {
override def toIntegerTColumn(rows: Seq[Seq[_]], ordinal: Int): TColumn = {
val colHead = if (rows.isEmpty) None else rows.head(ordinal)
colHead match {
case v: Integer => super.toIntegerTColumn(rows, ordinal)
case v: java.lang.Long => super.toBigIntTColumn(rows, ordinal)
case _: Integer => super.toIntegerTColumn(rows, ordinal)
case _: java.lang.Long => super.toBigIntTColumn(rows, ordinal)
case _ => super.toDefaultTColumn(rows, ordinal, Types.INTEGER)
}
}

override protected def toIntegerTColumnValue(row: List[Any], ordinal: Int): TColumnValue = {
override protected def toIntegerTColumnValue(row: Seq[_], ordinal: Int): TColumnValue = {
row(ordinal) match {
case v: Integer => super.toIntegerTColumnValue(row, ordinal)
case v: java.lang.Long => super.toBigIntTColumnValue(row, ordinal)
case _: Integer => super.toIntegerTColumnValue(row, ordinal)
case _: java.lang.Long => super.toBigIntTColumnValue(row, ordinal)
case _ => super.toDefaultTColumnValue(row, ordinal, Types.INTEGER)
}
}

override protected def toBigIntTColumn(rows: Seq[Seq[Any]], ordinal: Int): TColumn = {
override protected def toBigIntTColumn(rows: Seq[Seq[_]], ordinal: Int): TColumn = {
val colHead = if (rows.isEmpty) None else rows.head(ordinal)
colHead match {
case v: java.lang.Long => super.toBigIntTColumn(rows, ordinal)
case _ => super.toDefaultTColumn(rows, ordinal, Types.BIGINT)
}
}

override protected def toBigIntTColumnValue(row: List[Any], ordinal: Int): TColumnValue =
override protected def toBigIntTColumnValue(row: Seq[_], ordinal: Int): TColumnValue =
row(ordinal) match {
case v: java.lang.Long => super.toBigIntTColumnValue(row, ordinal)
case _ => super.toDefaultTColumnValue(row, ordinal, Types.BIGINT)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,11 +100,8 @@ abstract class JdbcOperation(session: Session) extends AbstractOperation(session
override protected def afterRun(): Unit = {}

protected def toTRowSet(taken: Iterator[Row]): TRowSet = {
val rowSetHelper = dialect.getRowSetHelper()
rowSetHelper.toTRowSet(
taken.toList.map(_.values),
schema.columns,
getProtocolVersion)
dialect.getTRowSetGenerator()
.toTRowSet(taken.toSeq.map(_.values), schema.columns, getProtocolVersion)
}

override def getResultSetMetadata: TGetResultSetMetadataResp = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,14 @@
*/
package org.apache.kyuubi.engine.jdbc.postgresql

import org.apache.kyuubi.engine.jdbc.schema.RowSetHelper
import org.apache.kyuubi.shaded.hive.service.rpc.thrift._
import org.apache.kyuubi.engine.jdbc.schema.DefaultJdbcTRowSetGenerator
import org.apache.kyuubi.shaded.hive.service.rpc.thrift.{TColumn, TColumnValue}

class PostgreSQLRowSetHelper extends RowSetHelper {
class PostgreSQLTRowSetGenerator extends DefaultJdbcTRowSetGenerator {

override def toSmallIntTColumn(rows: Seq[Seq[Any]], ordinal: Int): TColumn =
override def toSmallIntTColumn(rows: Seq[Seq[_]], ordinal: Int): TColumn =
toIntegerTColumn(rows, ordinal)

override def toSmallIntTColumnValue(row: List[Any], ordinal: Int): TColumnValue =
override protected def toSmallIntTColumnValue(row: Seq[_], ordinal: Int): TColumnValue =
toIntegerTColumnValue(row, ordinal)
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,151 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kyuubi.engine.jdbc.schema

import java.sql.Date
import java.sql.Types._
import java.time.LocalDateTime
import java.util

import org.apache.kyuubi.shaded.hive.service.rpc.thrift._
import org.apache.kyuubi.shaded.hive.service.rpc.thrift.TTypeId._
import org.apache.kyuubi.util.RowSetUtils.{bitSetToBuffer, formatDate, formatLocalDateTime}

class DefaultJdbcTRowSetGenerator extends JdbcTRowSetGenerator {

override def toTColumn(rows: Seq[Seq[_]], ordinal: Int, sqlType: Int): TColumn =
sqlType match {
case BIT => toBitTColumn(rows, ordinal)
case TINYINT => toTinyIntTColumn(rows, ordinal)
case SMALLINT => toSmallIntTColumn(rows, ordinal)
case INTEGER => toIntegerTColumn(rows, ordinal)
case BIGINT => toBigIntTColumn(rows, ordinal)
case REAL => toRealTColumn(rows, ordinal)
case DOUBLE => toDoubleTColumn(rows, ordinal)
case CHAR => toCharTColumn(rows, ordinal)
case VARCHAR => toVarcharTColumn(rows, ordinal)
case _ => toDefaultTColumn(rows, ordinal, sqlType)
}

override def toTColumnValue(ordinal: Int, row: Seq[_], types: Seq[Column]): TColumnValue =
getColumnType(types, ordinal) match {
case BIT => toBitTColumnValue(row, ordinal)
case TINYINT => toTinyIntTColumnValue(row, ordinal)
case SMALLINT => toSmallIntTColumnValue(row, ordinal)
case INTEGER => toIntegerTColumnValue(row, ordinal)
case BIGINT => toBigIntTColumnValue(row, ordinal)
case REAL => toRealTColumnValue(row, ordinal)
case DOUBLE => toDoubleTColumnValue(row, ordinal)
case CHAR => toCharTColumnValue(row, ordinal)
case VARCHAR => toVarcharTColumnValue(row, ordinal)
case otherType => toDefaultTColumnValue(row, ordinal, otherType)
}

protected def toDefaultTColumn(rows: Seq[Seq[_]], ordinal: Int, sqlType: Int): TColumn = {
val nulls = new java.util.BitSet()
val rowSize = rows.length
val values = new util.ArrayList[String](rowSize)
var i = 0
while (i < rowSize) {
val row = rows(i)
nulls.set(i, row(ordinal) == null)
val value =
if (row(ordinal) == null) {
""
} else {
toHiveString(row(ordinal), sqlType)
}
values.add(value)
i += 1
}
TColumn.stringVal(new TStringColumn(values, nulls))
}

protected def toBitTColumn(rows: Seq[Seq[_]], ordinal: Int): TColumn =
toTTypeColumn(BOOLEAN_TYPE, rows, ordinal)

protected def toTinyIntTColumn(rows: Seq[Seq[_]], ordinal: Int): TColumn =
toTTypeColumn(TINYINT_TYPE, rows, ordinal)

protected def toSmallIntTColumn(rows: Seq[Seq[_]], ordinal: Int): TColumn =
toTTypeColumn(SMALLINT_TYPE, rows, ordinal)

protected def toIntegerTColumn(rows: Seq[Seq[_]], ordinal: Int): TColumn =
toTTypeColumn(INT_TYPE, rows, ordinal)

protected def toBigIntTColumn(rows: Seq[Seq[_]], ordinal: Int): TColumn =
toTTypeColumn(BIGINT_TYPE, rows, ordinal)

protected def toRealTColumn(rows: Seq[Seq[_]], ordinal: Int): TColumn =
toTTypeColumn(FLOAT_TYPE, rows, ordinal)

protected def toDoubleTColumn(rows: Seq[Seq[_]], ordinal: Int): TColumn =
toTTypeColumn(DOUBLE_TYPE, rows, ordinal)

protected def toCharTColumn(rows: Seq[Seq[_]], ordinal: Int): TColumn =
toTTypeColumn(CHAR_TYPE, rows, ordinal)

protected def toVarcharTColumn(rows: Seq[Seq[_]], ordinal: Int): TColumn =
toTTypeColumn(STRING_TYPE, rows, ordinal)

// ==========================================================

protected def toBitTColumnValue(row: Seq[_], ordinal: Int): TColumnValue =
toTTypeColumnVal(BOOLEAN_TYPE, row, ordinal)

protected def toTinyIntTColumnValue(row: Seq[_], ordinal: Int): TColumnValue =
toTTypeColumnVal(TINYINT_TYPE, row, ordinal)

protected def toSmallIntTColumnValue(row: Seq[_], ordinal: Int): TColumnValue =
toTTypeColumnVal(SMALLINT_TYPE, row, ordinal)

protected def toIntegerTColumnValue(row: Seq[_], ordinal: Int): TColumnValue =
toTTypeColumnVal(INT_TYPE, row, ordinal)

protected def toBigIntTColumnValue(row: Seq[_], ordinal: Int): TColumnValue =
toTTypeColumnVal(BIGINT_TYPE, row, ordinal)

protected def toRealTColumnValue(row: Seq[_], ordinal: Int): TColumnValue =
toTTypeColumnVal(FLOAT_TYPE, row, ordinal)

protected def toDoubleTColumnValue(row: Seq[_], ordinal: Int): TColumnValue =
toTTypeColumnVal(DOUBLE_TYPE, row, ordinal)

protected def toCharTColumnValue(row: Seq[_], ordinal: Int): TColumnValue =
toTTypeColumnVal(STRING_TYPE, row, ordinal)

protected def toVarcharTColumnValue(row: Seq[_], ordinal: Int): TColumnValue =
toTTypeColumnVal(STRING_TYPE, row, ordinal)

protected def toDefaultTColumnValue(row: Seq[_], ordinal: Int, sqlType: Int): TColumnValue = {
val tStrValue = new TStringValue
if (row(ordinal) != null) {
tStrValue.setValue(
toHiveString(row(ordinal), sqlType))
}
TColumnValue.stringVal(tStrValue)
}

protected def toHiveString(data: Any, sqlType: Int): String =
(data, sqlType) match {
case (date: Date, DATE) => formatDate(date)
case (dateTime: LocalDateTime, TIMESTAMP) => formatLocalDateTime(dateTime)
case (decimal: java.math.BigDecimal, DECIMAL) => decimal.toPlainString
case (bigint: java.math.BigInteger, BIGINT) => bigint.toString()
case (other, _) => other.toString
}
}
Loading

0 comments on commit 09febce

Please sign in to comment.