diff --git a/pinot-connectors/pinot-spark-2-connector/README.md b/pinot-connectors/pinot-spark-2-connector/README.md index 771de871ae9b..48fcd587a784 100644 --- a/pinot-connectors/pinot-spark-2-connector/README.md +++ b/pinot-connectors/pinot-spark-2-connector/README.md @@ -20,7 +20,7 @@ --> # Spark-Pinot Connector -Spark-pinot connector to read and write data from/to Pinot. +Spark-pinot connector to read data from Pinot. Detailed read model documentation is here; [Spark-Pinot Connector Read Model](documentation/read_model.md) @@ -57,7 +57,23 @@ val data = spark.read data.show(100) ``` -For more examples, see `src/test/scala/example/ExampleSparkPinotConnectorTest.scala` +## Examples + +There are more examples included in `src/test/scala/.../ExampleSparkPinotConnectorTest.scala`. +You can run the examples locally (e.g. using your IDE) in standalone mode by starting a local Pinot cluster. See: https://docs.pinot.apache.org/basics/getting-started/running-pinot-locally + +You can also run the tests in _cluster mode_ using following command: +```shell +export SPARK_CLUSTER= + +# Edit the ExampleSparkPinotConnectorTest to get rid of `.master("local")` and rebuild the jar before running this command +spark-submit \ + --class org.apache.pinot.connector.spark.datasource.ExampleSparkPinotConnectorTest \ + --jars ./target/pinot-spark-2-connector-0.13.0-SNAPSHOT-shaded.jar \ + --master $SPARK_CLUSTER \ + --deploy-mode cluster \ + ./target/pinot-spark-2-connector-0.13.0-SNAPSHOT-tests.jar +``` Spark-Pinot connector uses Spark `DatasourceV2 API`. Please check the Databricks presentation for DatasourceV2 API; diff --git a/pinot-connectors/pinot-spark-2-connector/documentation/read_model.md b/pinot-connectors/pinot-spark-2-connector/documentation/read_model.md index 547bd9eda371..aa7a2f0e4596 100644 --- a/pinot-connectors/pinot-spark-2-connector/documentation/read_model.md +++ b/pinot-connectors/pinot-spark-2-connector/documentation/read_model.md @@ -74,7 +74,7 @@ If `segmentsPerSplit` is equal to 1, there will be created 6 Spark partition; If `segmentsPerSplit` value is too low, that means more parallelism. But this also mean that a lot of connection will be opened with Pinot servers, and will increase QPS on the Pinot servers. -If `segmetnsPerSplit` value is too high, that means less parallelism. Each Pinot server will scan more segments per request. +If `segmentsPerSplit` value is too high, that means less parallelism. Each Pinot server will scan more segments per request. **Note:** Pinot servers prunes segments based on the segment metadata when query comes. In some cases(for example filtering based on the some columns), some servers may not return data. Therefore, some Spark partitions will be empty. In this cases, `repartition()` may be applied for efficient data analysis after loading data to Spark. @@ -120,7 +120,7 @@ val df = spark.read .load() .filter($"DestStateName" === "Florida") .filter($"Origin" === "ORD") - .select($"Carrier") + .select($"DestStateName", $"Origin", $"Carrier") ``` - Offline query: `select DestStateName, Origin, Carrier from airlineStats_OFFLINE where DestStateName = 'Florida and Origin = 'ORD' LIMIT {Int.MaxValue}` @@ -137,6 +137,4 @@ val df = spark.read | usePushDownFilters | Push filters to pinot servers or not. If true, data exchange between pinot server and spark will be minimized. | No | true | | segmentsPerSplit | Represents the maximum segment count that will be scanned by pinot server in one connection | No | 3 | | pinotServerTimeoutMs | The maximum timeout(ms) to get data from pinot server | No | 10 mins | - - - \ No newline at end of file +| useGrpcServer | Boolean value to enable reads via gRPC. This option is more memory efficient both on Pinot server and Spark executor side because it utilizes streaming. Requires gRPC to be enabled on Pinot server. | No | false | diff --git a/pinot-connectors/pinot-spark-3-connector/README.md b/pinot-connectors/pinot-spark-3-connector/README.md new file mode 100644 index 000000000000..afebc5dfde7d --- /dev/null +++ b/pinot-connectors/pinot-spark-3-connector/README.md @@ -0,0 +1,85 @@ + +# Spark-Pinot Connector + +Spark-pinot connector to read data from Pinot. + +Detailed read model documentation is here; [Spark-Pinot Connector Read Model](documentation/read_model.md) + + +## Features +- Query realtime, offline or hybrid tables +- Distributed, parallel scan +- Streaming reads using gRPC (optional) +- SQL support instead of PQL +- Column and filter push down to optimize performance +- Overlap between realtime and offline segments is queried exactly once for hybrid tables +- Schema discovery + - Dynamic inference + - Static analysis of case class + +## Quick Start +```scala +import org.apache.spark.sql.SparkSession + +val spark: SparkSession = SparkSession + .builder() + .appName("spark-pinot-connector-test") + .master("local") + .getOrCreate() + +import spark.implicits._ + +val data = spark.read + .format("pinot") + .option("table", "airlineStats") + .option("tableType", "offline") + .load() + .filter($"DestStateName" === "Florida") + +data.show(100) +``` + +## Examples + +There are more examples included in `src/test/scala/.../ExampleSparkPinotConnectorTest.scala`. +You can run the examples locally (e.g. using your IDE) in standalone mode by starting a local Pinot cluster. See: https://docs.pinot.apache.org/basics/getting-started/running-pinot-locally + +You can also run the tests in _cluster mode_ using following command: +```shell +export SPARK_CLUSTER= + +# Edit the ExampleSparkPinotConnectorTest to get rid of `.master("local")` and rebuild the jar before running this command +spark-submit \ + --class org.apache.pinot.connector.spark.v3.datasource.ExampleSparkPinotConnectorTest \ + --jars ./target/pinot-spark-3-connector-0.13.0-SNAPSHOT-shaded.jar \ + --master $SPARK_CLUSTER \ + --deploy-mode cluster \ + ./target/pinot-spark-3-connector-0.13.0-SNAPSHOT-tests.jar +``` + +Spark-Pinot connector uses Spark `DatasourceV2 API`. Please check the Databricks presentation for DatasourceV2 API; + +https://www.slideshare.net/databricks/apache-spark-data-source-v2-with-wenchen-fan-and-gengliang-wang + +## Future Works +- Add integration tests for read operation +- Add write support(pinot segment write logic will be changed in later versions of pinot) diff --git a/pinot-connectors/pinot-spark-3-connector/documentation/images/spark-pinot-connector-executor-server-interaction.jpg b/pinot-connectors/pinot-spark-3-connector/documentation/images/spark-pinot-connector-executor-server-interaction.jpg new file mode 100644 index 000000000000..dc3b88ae68f2 Binary files /dev/null and b/pinot-connectors/pinot-spark-3-connector/documentation/images/spark-pinot-connector-executor-server-interaction.jpg differ diff --git a/pinot-connectors/pinot-spark-3-connector/documentation/read_model.md b/pinot-connectors/pinot-spark-3-connector/documentation/read_model.md new file mode 100644 index 000000000000..aa7a2f0e4596 --- /dev/null +++ b/pinot-connectors/pinot-spark-3-connector/documentation/read_model.md @@ -0,0 +1,140 @@ + +# Read Model + +Connector can scan offline, hybrid and realtime tables. Base two options <`table` and `tableType`> parameters have to given like below; +- For offline table: `table: tbl`, `tableType: OFFLINE or offline` +- For realtime table `table: tbl`, `tableType: REALTIME or realtime` +- For hybrid table `table: tbl`, `tableType: HYBRID or hybrid` + +An example scan; + +```scala +val df = spark.read + .format("pinot") + .option("table", "airlineStats") + .option("tableType", "offline") + .load() +``` + +Custom schema can be specified directly. If schema is not specified, connector read table schema from Pinot controller, and then convert to the Spark schema. + +### Architecture + +Connector reads data from `Pinot Servers` directly. For this operation, firstly, connector creates query with given filters(if filter push down is enabled) and columns, then finds routing table for created query. It creates pinot splits that contains **ONE PINOT SERVER and ONE OR MORE SEGMENT per spark partition**, based on the routing table and `segmentsPerSplit`(detailed explain is defined below). Lastly, each partition read data from specified pinot server in parallel. + +![Spark-Pinot Connector Architecture](images/spark-pinot-connector-executor-server-interaction.jpg) + +Each Spark partition open connection with Pinot server, and read data. For example, assume that routing table informations for specified query is like that: + +``` +- realtime -> + - realtimeServer1 -> (segment1, segment2, segment3) + - realtimeServer2 -> (segment4) +- offline -> + - offlineServer10 -> (segment10, segment20) +``` + +If `segmentsPerSplit` is equal to 3, there will be created 3 Spark partition like below; + +| Spark Partition | Queried Pinot Server/Segments | +| ------------- | ------------- | +| partition1 | realtimeServer1 / segment1, segment2, segment3 | +| partition2 | realtimeServer2 / segment4 | +| partition3 | offlineServer10 / segment10, segment20 | + +If `segmentsPerSplit` is equal to 1, there will be created 6 Spark partition; + +| Spark Partition | Queried Pinot Server/Segments | +| ------------- | ------------- | +| partition1 | realtimeServer1 / segment1 | +| partition2 | realtimeServer1 / segment2 | +| partition3 | realtimeServer1 / segment3 | +| partition4 | realtimeServer2 / segment4 | +| partition5 | offlineServer10 / segment10 | +| partition6 | offlineServer10 / segment20 | + +If `segmentsPerSplit` value is too low, that means more parallelism. But this also mean that a lot of connection will be opened with Pinot servers, and will increase QPS on the Pinot servers. + +If `segmentsPerSplit` value is too high, that means less parallelism. Each Pinot server will scan more segments per request. + +**Note:** Pinot servers prunes segments based on the segment metadata when query comes. In some cases(for example filtering based on the some columns), some servers may not return data. Therefore, some Spark partitions will be empty. In this cases, `repartition()` may be applied for efficient data analysis after loading data to Spark. + +### Filter And Column Push Down +Connector supports filter and column push down. Filters and columns are pushed to the pinot servers. Filter and column push down improves the performance while reading data because of its minimizing data transfer between Pinot and Spark. In default, filter push down enabled. If filters are desired to be applied in Spark, `usePushDownFilters` should be set as `false`. + +Connector uses SQL, as a result all sql filters are supported. + +### Segment Pruning + +Connector receives routing table of given query to get information on which Pinot servers to will be queried and which segments will be scan. If partitioning is enabled for given Pinot table, and created query in Spark will be scan the specific partitions, only required Pinot server and segment informations will be got(that means segment pruning operation will be applied before data reading like Pinot brokers). For more information; [Optimizing Scatter and Gather in Pinot](https://docs.pinot.apache.org/operators/operating-pinot/tuning/routing#optimizing-scatter-and-gather) + +### Table Querying +Connector uses SQL to query Pinot tables. + +Connector creates realtime and offline queries based on the filters and required columns. +- If queried table type is `OFFLINE` or `REALTIME`, routing table information will be got for specific table type. +- If queried table type is `HYBRID`, realtime and offline routing table information will be got. Also, connector receives `TimeBoundary` information for given table, and use it in query to ensure that the overlap between realtime and offline segment data is queried exactly once. For more information; [Pinot Broker](https://docs.pinot.apache.org/basics/components/broker) + +### Query Generation + +Example generated queries for given usages(assume that `airlineStats` table is hybrid and TimeBoundary information is `DaysSinceEpoch, 16084`); + +```scala +val df = spark.read + .format("pinot") + .option("table", "airlineStats") + .option("tableType", "hybrid") + .load() +``` + +For above usage, realtime and offline SQL queries will be created; + +- Offline query: `select * from airlineStats_OFFLINE where DaysSinceEpoch < 16084 LIMIT {Int.MaxValue}` + +- Realtime query: `select * from airlineStats_REALTIME where DaysSinceEpoch >= 16084 LIMIT {Int.MaxValue}` + +```scala +val df = spark.read + .format("pinot") + .option("table", "airlineStats") + .option("tableType", "offline") + .load() + .filter($"DestStateName" === "Florida") + .filter($"Origin" === "ORD") + .select($"DestStateName", $"Origin", $"Carrier") +``` + +- Offline query: `select DestStateName, Origin, Carrier from airlineStats_OFFLINE where DestStateName = 'Florida and Origin = 'ORD' LIMIT {Int.MaxValue}` + +**Note: Limit is added to every query. Because, generated queries will be converted to the Pinot `BrokerRequest` class. In this operation, pinot sets limit to `10` automatically. Therefore, `LIMIT` was set to `Int.MaxValue` to prevent this issue.** + +### Connector Read Parameters +| Configuration | Description | Required | Default Value | +| ------------- | ------------- | ------------- | ------------- | +| table | Pinot table name without table type | Yes | - | +| tableType | Pinot table type(`realtime`, `offline` or `hybrid`) | Yes | - | +| controller | Pinot controller url and port. Input should be `url:port` format without schema. Connector does not support `https` schema for now. | No | localhost:9000 | +| broker | Pinot broker url and port. Input should be `url:port` format without schema. If not specified, connector will find broker instances of table automatically. Connector does not support `https` schema for now | No | Fetch broker instances of table from Pinot Controller | +| usePushDownFilters | Push filters to pinot servers or not. If true, data exchange between pinot server and spark will be minimized. | No | true | +| segmentsPerSplit | Represents the maximum segment count that will be scanned by pinot server in one connection | No | 3 | +| pinotServerTimeoutMs | The maximum timeout(ms) to get data from pinot server | No | 10 mins | +| useGrpcServer | Boolean value to enable reads via gRPC. This option is more memory efficient both on Pinot server and Spark executor side because it utilizes streaming. Requires gRPC to be enabled on Pinot server. | No | false | diff --git a/pinot-connectors/pinot-spark-3-connector/pom.xml b/pinot-connectors/pinot-spark-3-connector/pom.xml new file mode 100644 index 000000000000..2ad246e0cbe4 --- /dev/null +++ b/pinot-connectors/pinot-spark-3-connector/pom.xml @@ -0,0 +1,324 @@ + + + + 4.0.0 + + pinot-connectors + org.apache.pinot + 0.13.0-SNAPSHOT + .. + + pinot-spark-3-connector + Pinot Spark 3 Connector + https://pinot.apache.org/ + + ${basedir}/../.. + 3.2.1 + 4.8 + 3.1.1 + org.apache.pinot.\$internal + + + false + + + + + scala-2.12 + + true + + + 2.12.11 + 2.12 + + + + org.apache.spark + spark-sql_${scala.compat.version} + ${spark.version} + provided + + + org.antlr + antlr4-runtime + + + org.apache.curator + curator-recipes + + + com.thoughtworks.paranamer + paranamer + + + org.scala-lang.modules + scala-xml_${scala.compat.version} + + + org.scala-lang + scala-library + + + com.zaxxer + HikariCP-java7 + + + + + + org.scala-lang + scala-library + ${scala.version} + provided + + + + org.scalatest + scalatest_${scala.compat.version} + ${scalatest.version} + test + + + org.scala-lang.modules + scala-xml_${scala.compat.version} + + + org.scala-lang + scala-library + + + org.scala-lang + scala-reflect + + + + + + + + + org.xolstice.maven.plugins + protobuf-maven-plugin + 0.6.1 + + com.google.protobuf:protoc:3.19.2:exe:${os.detected.classifier} + grpc-java + io.grpc:protoc-gen-grpc-java:1.44.1:exe:${os.detected.classifier} + + + + + compile + compile-custom + + + + + + maven-shade-plugin + + + package + + shade + + + + + com + ${shadeBase}.com + + com.google.protobuf.** + com.google.common.** + + + + + + + + + net.alchim31.maven + scala-maven-plugin + 3.2.2 + + + eclipse-add-source + + add-source + + + + scala-compile-first + process-resources + + compile + + + + scala-test-compile-first + process-test-resources + + testCompile + + + + attach-scaladocs + verify + + doc-jar + + + + + ${scala.version} + + -unchecked + -deprecation + -feature + + + -Xms1024m + -Xmx1024m + + + -source + ${jdk.version} + -target + ${jdk.version} + -Xlint:all,-serial,-path + + + + + + + + release-sign-artifacts + + + + + org.apache.maven.plugins + maven-gpg-plugin + 1.6 + + + + + + + + src/main/scala + src/test/scala + + + src/main/resources + + + + + src/test/resources + + + + + + org.apache.maven.plugins + maven-surefire-plugin + 2.7 + + true + + + + + org.scalatest + scalatest-maven-plugin + 1.0 + + ${project.build.directory}/surefire-reports + . + false + + + + test + + test + + + + + + + + + + org.apache.pinot + pinot-common + + + org.apache.zookeeper + zookeeper + + + org.antlr + antlr4-runtime + + + + + org.apache.pinot + pinot-spark-common + ${project.parent.version} + + + org.apache.pinot + pinot-core + ${project.parent.version} + + + org.antlr + antlr4-runtime + + + + + provided + org.antlr + antlr4-runtime + ${antlr-runtime.version} + + + test + javax.servlet + javax.servlet-api + 3.0.1 + + + + diff --git a/pinot-connectors/pinot-spark-3-connector/src/main/resources/META-INF/services/org.apache.spark.sql.sources.DataSourceRegister b/pinot-connectors/pinot-spark-3-connector/src/main/resources/META-INF/services/org.apache.spark.sql.sources.DataSourceRegister new file mode 100644 index 000000000000..6bf835b475e6 --- /dev/null +++ b/pinot-connectors/pinot-spark-3-connector/src/main/resources/META-INF/services/org.apache.spark.sql.sources.DataSourceRegister @@ -0,0 +1 @@ +org.apache.pinot.connector.spark.v3.datasource.PinotDataSource diff --git a/pinot-connectors/pinot-spark-3-connector/src/main/scala/org/apache/pinot/connector/spark/v3/datasource/PinotDataSource.scala b/pinot-connectors/pinot-spark-3-connector/src/main/scala/org/apache/pinot/connector/spark/v3/datasource/PinotDataSource.scala new file mode 100644 index 000000000000..a551218cbbcd --- /dev/null +++ b/pinot-connectors/pinot-spark-3-connector/src/main/scala/org/apache/pinot/connector/spark/v3/datasource/PinotDataSource.scala @@ -0,0 +1,55 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.connector.spark.v3.datasource + +import org.apache.pinot.connector.spark.common.{PinotClusterClient, PinotDataSourceReadOptions} +import org.apache.spark.sql.connector.catalog.{Table, TableProvider} +import org.apache.spark.sql.connector.expressions.Transform +import org.apache.spark.sql.sources.DataSourceRegister +import org.apache.spark.sql.types.StructType +import org.apache.spark.sql.util.CaseInsensitiveStringMap + +import java.util + +/** + * PinotDataSource implements TableProvider interface of Spark DataSourceV2 API + * to provide read access to Pinot tables. + */ +class PinotDataSource extends TableProvider with DataSourceRegister { + override def shortName(): String = "pinot" + + override def inferSchema(options: CaseInsensitiveStringMap): StructType = { + val readParameters = PinotDataSourceReadOptions.from(options) + val tableName = readParameters.tableName + val controller = readParameters.controller + + val pinotTableSchema = + PinotClusterClient.getTableSchema(controller, tableName) + TypeConverter.pinotSchemaToSparkSchema(pinotTableSchema) + } + + override def getTable(schema: StructType, + partitioning: Array[Transform], + properties: util.Map[String, String]): Table = { + val tableName = properties.get(PinotDataSourceReadOptions.CONFIG_TABLE_NAME) + new PinotTable(tableName, schema) + } + + override def supportsExternalMetadata(): Boolean = true +} diff --git a/pinot-connectors/pinot-spark-3-connector/src/main/scala/org/apache/pinot/connector/spark/v3/datasource/PinotInputPartition.scala b/pinot-connectors/pinot-spark-3-connector/src/main/scala/org/apache/pinot/connector/spark/v3/datasource/PinotInputPartition.scala new file mode 100644 index 000000000000..b9b4596c3e15 --- /dev/null +++ b/pinot-connectors/pinot-spark-3-connector/src/main/scala/org/apache/pinot/connector/spark/v3/datasource/PinotInputPartition.scala @@ -0,0 +1,42 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.connector.spark.v3.datasource + +import org.apache.pinot.connector.spark.common.PinotDataSourceReadOptions +import org.apache.pinot.connector.spark.common.partition.PinotSplit +import org.apache.spark.sql.connector.read.InputPartition +import org.apache.spark.sql.types.StructType + +/** + * PinotInputPartition: Implements Spark's InputPartition which convey partition related information + * from Spark master to executors. This class is serialized and sent across the network so it should + * be kept lean for good performance. + * + * @param schema Schema for the scan/read operation. This can be a subset of the tables schema + * @param partitionId Integer which is used as requestId when sending query to Pinot servers + * @param pinotSplit An instance of PinotSplit which encapsulates segment and query information + * @param dataSourceOptions PinotDataSourceReadOptions instance created for the read + */ +case class PinotInputPartition( + schema: StructType, + partitionId: Int, + pinotSplit: PinotSplit, + dataSourceOptions: PinotDataSourceReadOptions) + extends InputPartition { +} diff --git a/pinot-connectors/pinot-spark-3-connector/src/main/scala/org/apache/pinot/connector/spark/v3/datasource/PinotScan.scala b/pinot-connectors/pinot-spark-3-connector/src/main/scala/org/apache/pinot/connector/spark/v3/datasource/PinotScan.scala new file mode 100644 index 000000000000..a50967fe5e57 --- /dev/null +++ b/pinot-connectors/pinot-spark-3-connector/src/main/scala/org/apache/pinot/connector/spark/v3/datasource/PinotScan.scala @@ -0,0 +1,97 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.connector.spark.v3.datasource + +import org.apache.pinot.common.datatable.DataTable +import org.apache.pinot.connector.spark.common.partition.{PinotSplit, PinotSplitter} +import org.apache.pinot.connector.spark.common.query.ScanQuery +import org.apache.pinot.connector.spark.common.reader.PinotAbstractPartitionReader +import org.apache.pinot.connector.spark.common.{InstanceInfo, PinotClusterClient, PinotDataSourceReadOptions} +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.connector.read.{ + Batch, + InputPartition, + PartitionReader, + PartitionReaderFactory, + Scan +} +import org.apache.spark.sql.types.StructType + +import scala.collection.mutable.Map + +/** + * PinotScan implements Spark's 'Scan', which is a logical representation of a scan operation, + * and also adds the 'Batch' mixin to support batch reads. + * + * @param query An instance of ScanQuery which encapsulates SQL queries to be executed + * @param schema + * @param readParameters + */ +class PinotScan( + query: ScanQuery, + schema: StructType, + readParameters: PinotDataSourceReadOptions) + extends Scan with Batch { + + override def readSchema(): StructType = schema + + override def toBatch: Batch = this + + override def planInputPartitions(): Array[InputPartition] = { + val routingTable = PinotClusterClient.getRoutingTable(readParameters.broker, query) + + val instanceInfo : Map[String, InstanceInfo] = Map() + val instanceInfoReader = (instance:String) => { // cached reader to reduce network round trips + instanceInfo.getOrElseUpdate( + instance, + PinotClusterClient.getInstanceInfo(readParameters.controller, instance) + ) + } + + PinotSplitter + .generatePinotSplits(query, routingTable, instanceInfoReader, readParameters) + .zipWithIndex + .map { + case (pinotSplit, partitionId) => + PinotInputPartition(readSchema(), partitionId, pinotSplit, readParameters) + .asInstanceOf[InputPartition] + } + .toArray + } + + override def createReaderFactory(): PartitionReaderFactory = { + // necessary to make PartitionReaderFactory serializable + val _schema = this.schema + + (partition: InputPartition) => { + partition match { + case p: PinotInputPartition => + new PartitionReader[InternalRow] with PinotAbstractPartitionReader[InternalRow] { + override def _partitionId: Int = p.partitionId + override def _pinotSplit: PinotSplit = p.pinotSplit + override def _dataSourceOptions: PinotDataSourceReadOptions = p.dataSourceOptions + override def _translator: DataTable => Seq[InternalRow] = + TypeConverter.pinotDataTableToInternalRows(_, _schema) + } + case _ => + throw new Exception("Unknown InputPartition type. Expecting PinotInputPartition") + } + } + } +} diff --git a/pinot-connectors/pinot-spark-3-connector/src/main/scala/org/apache/pinot/connector/spark/v3/datasource/PinotScanBuilder.scala b/pinot-connectors/pinot-spark-3-connector/src/main/scala/org/apache/pinot/connector/spark/v3/datasource/PinotScanBuilder.scala new file mode 100644 index 000000000000..a3e1bf1b0204 --- /dev/null +++ b/pinot-connectors/pinot-spark-3-connector/src/main/scala/org/apache/pinot/connector/spark/v3/datasource/PinotScanBuilder.scala @@ -0,0 +1,81 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.connector.spark.v3.datasource + +import org.apache.pinot.connector.spark.common.query.ScanQueryGenerator +import org.apache.pinot.connector.spark.common.{PinotClusterClient, PinotDataSourceReadOptions} +import org.apache.pinot.connector.spark.v3.datasource.query.FilterPushDown +import org.apache.spark.sql.connector.read.{Scan, ScanBuilder, SupportsPushDownFilters, SupportsPushDownRequiredColumns} +import org.apache.spark.sql.sources.Filter +import org.apache.spark.sql.types.StructType + +/** + * PinotScanBuilder: Implementation of Spark DataSourceV2 API's ScanBuilder interface. + * This is the main class in which various push down functionality is implemented. + * We currently support pushing down Filters (where clause) and RequiredColumns (selection). + * + * @param readParameters PinotDataSourceReadOptions instance for the read + */ +class PinotScanBuilder(readParameters: PinotDataSourceReadOptions) + extends ScanBuilder with SupportsPushDownFilters with SupportsPushDownRequiredColumns { + + private var acceptedFilters: Array[Filter] = Array.empty + private var currentSchema: StructType = _ + + override def build(): Scan = { + // Time boundary is used when table is hybrid to ensure that the overlap + // between realtime and offline segment data is queried exactly once + val timeBoundaryInfo = + if (readParameters.tableType.isDefined) { + None + } else { + PinotClusterClient.getTimeBoundaryInfo(readParameters.broker, readParameters.tableName) + } + + val whereCondition = FilterPushDown.compileFiltersToSqlWhereClause(this.acceptedFilters) + val scanQuery = ScanQueryGenerator.generate( + readParameters.tableName, + readParameters.tableType, + timeBoundaryInfo, + currentSchema.fieldNames, + whereCondition + ) + + new PinotScan(scanQuery, currentSchema, readParameters) + } + + override def pushFilters(filters: Array[Filter]): Array[Filter] = { + if (readParameters.usePushDownFilters) { + val (acceptedFilters, postScanFilters) = FilterPushDown.acceptFilters(filters) + this.acceptedFilters = acceptedFilters + postScanFilters + } else { + filters + } + } + + override def pushedFilters(): Array[Filter] = { + this.acceptedFilters + } + + override def pruneColumns(requiredSchema: StructType): Unit = { + this.currentSchema = requiredSchema + } + +} diff --git a/pinot-connectors/pinot-spark-3-connector/src/main/scala/org/apache/pinot/connector/spark/v3/datasource/PinotTable.scala b/pinot-connectors/pinot-spark-3-connector/src/main/scala/org/apache/pinot/connector/spark/v3/datasource/PinotTable.scala new file mode 100644 index 000000000000..28cced506e8f --- /dev/null +++ b/pinot-connectors/pinot-spark-3-connector/src/main/scala/org/apache/pinot/connector/spark/v3/datasource/PinotTable.scala @@ -0,0 +1,50 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.connector.spark.v3.datasource + +import org.apache.pinot.connector.spark.common.PinotDataSourceReadOptions +import org.apache.spark.sql.connector.catalog.{SupportsRead, Table, TableCapability} +import org.apache.spark.sql.connector.read.ScanBuilder +import org.apache.spark.sql.types.StructType +import org.apache.spark.sql.util.CaseInsensitiveStringMap + +import java.util + +/** + * PinotTable implements Spark's Table interface to expose a logical representation of a + * Pinot table. This is the interface where Spark discovers table capabilities such as + * 'SupportsRead'. For now Pinot tables only support batch reads. + * + * @param name Pinot table name + * @param schema Schema provided by Spark. This can be different than table schema + */ +class PinotTable(name: String, schema: StructType) extends Table with SupportsRead { + override def name(): String = name + + override def schema(): StructType = schema + + override def capabilities(): util.Set[TableCapability] = { + util.EnumSet.of(TableCapability.BATCH_READ) + } + + override def newScanBuilder(options: CaseInsensitiveStringMap): ScanBuilder = { + val readParameters = PinotDataSourceReadOptions.from(options) + new PinotScanBuilder(readParameters) + } +} diff --git a/pinot-connectors/pinot-spark-3-connector/src/main/scala/org/apache/pinot/connector/spark/v3/datasource/TypeConverter.scala b/pinot-connectors/pinot-spark-3-connector/src/main/scala/org/apache/pinot/connector/spark/v3/datasource/TypeConverter.scala new file mode 100644 index 000000000000..451bc5af72be --- /dev/null +++ b/pinot-connectors/pinot-spark-3-connector/src/main/scala/org/apache/pinot/connector/spark/v3/datasource/TypeConverter.scala @@ -0,0 +1,132 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.connector.spark.v3.datasource + +import org.apache.pinot.common.datatable.DataTable +import org.apache.pinot.common.utils.DataSchema.ColumnDataType +import org.apache.pinot.connector.spark.common.PinotException +import org.apache.pinot.spi.data.{FieldSpec, Schema} +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.util.ArrayData +import org.apache.spark.sql.types._ +import org.apache.spark.unsafe.types.UTF8String + +import scala.collection.JavaConverters._ + +/** + * Helper methods for spark-pinot conversions + */ +private[pinot] object TypeConverter { + + /** Convert a Pinot schema to Spark schema. */ + def pinotSchemaToSparkSchema(schema: Schema): StructType = { + val structFields = schema.getAllFieldSpecs.asScala.map { field => + val sparkDataType = pinotDataTypeToSparkDataType(field.getDataType) + if (field.isSingleValueField) { + StructField(field.getName, sparkDataType) + } else { + StructField(field.getName, ArrayType(sparkDataType)) + } + } + StructType(structFields.toList) + } + + private def pinotDataTypeToSparkDataType(dataType: FieldSpec.DataType): DataType = + dataType match { + case FieldSpec.DataType.INT => IntegerType + case FieldSpec.DataType.LONG => LongType + case FieldSpec.DataType.FLOAT => FloatType + case FieldSpec.DataType.DOUBLE => DoubleType + case FieldSpec.DataType.STRING => StringType + case FieldSpec.DataType.BYTES => ArrayType(ByteType) + case FieldSpec.DataType.TIMESTAMP => LongType + case FieldSpec.DataType.BOOLEAN => BooleanType + case _ => + throw PinotException(s"Unsupported pinot data type '$dataType") + } + + /** Convert Pinot DataTable to Seq of InternalRow */ + def pinotDataTableToInternalRows( + dataTable: DataTable, + sparkSchema: StructType): Seq[InternalRow] = { + val dataTableColumnNames = dataTable.getDataSchema.getColumnNames + (0 until dataTable.getNumberOfRows).map { rowIndex => + // spark schema is used to ensure columns order + val columns = sparkSchema.fields.map { field => + val colIndex = dataTableColumnNames.indexOf(field.name) + if (colIndex < 0) { + throw PinotException(s"'${field.name}' not found in Pinot server response") + } else { + // pinot column data type can be used directly, + // because all of them is supported in spark schema + val columnDataType = dataTable.getDataSchema.getColumnDataType(colIndex) + readPinotColumnData(dataTable, columnDataType, rowIndex, colIndex) + } + } + InternalRow.fromSeq(columns) + } + } + + private def readPinotColumnData( + dataTable: DataTable, + columnDataType: ColumnDataType, + rowIndex: Int, + colIndex: Int): Any = columnDataType match { + // single column types + case ColumnDataType.STRING => + UTF8String.fromString(dataTable.getString(rowIndex, colIndex)) + case ColumnDataType.INT => + dataTable.getInt(rowIndex, colIndex) + case ColumnDataType.LONG => + dataTable.getLong(rowIndex, colIndex) + case ColumnDataType.FLOAT => + dataTable.getFloat(rowIndex, colIndex) + case ColumnDataType.DOUBLE => + dataTable.getDouble(rowIndex, colIndex) + case ColumnDataType.TIMESTAMP => + dataTable.getLong(rowIndex, colIndex) + case ColumnDataType.BOOLEAN => + dataTable.getInt(rowIndex, colIndex) == 1 + + // array column types + case ColumnDataType.STRING_ARRAY => + ArrayData.toArrayData( + dataTable.getStringArray(rowIndex, colIndex).map(UTF8String.fromString).toSeq + ) + case ColumnDataType.INT_ARRAY => + ArrayData.toArrayData(dataTable.getIntArray(rowIndex, colIndex).toSeq) + case ColumnDataType.LONG_ARRAY => + ArrayData.toArrayData(dataTable.getLongArray(rowIndex, colIndex).toSeq) + case ColumnDataType.FLOAT_ARRAY => + ArrayData.toArrayData(dataTable.getFloatArray(rowIndex, colIndex).toSeq) + case ColumnDataType.DOUBLE_ARRAY => + ArrayData.toArrayData(dataTable.getDoubleArray(rowIndex, colIndex).toSeq) + case ColumnDataType.BYTES => + ArrayData.toArrayData(dataTable.getBytes(rowIndex, colIndex).getBytes) + case ColumnDataType.TIMESTAMP_ARRAY => + ArrayData.toArrayData(dataTable.getLongArray(rowIndex, colIndex).toSeq) + case ColumnDataType.BOOLEAN_ARRAY => + ArrayData.toArrayData( + dataTable.getIntArray(rowIndex, colIndex).map(i => i == 1).toSeq + ) + + case _ => + throw PinotException(s"'$columnDataType' is not supported") + } +} diff --git a/pinot-connectors/pinot-spark-3-connector/src/main/scala/org/apache/pinot/connector/spark/v3/datasource/query/FilterPushDown.scala b/pinot-connectors/pinot-spark-3-connector/src/main/scala/org/apache/pinot/connector/spark/v3/datasource/query/FilterPushDown.scala new file mode 100644 index 000000000000..3d4e3f658df2 --- /dev/null +++ b/pinot-connectors/pinot-spark-3-connector/src/main/scala/org/apache/pinot/connector/spark/v3/datasource/query/FilterPushDown.scala @@ -0,0 +1,123 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.connector.spark.v3.datasource.query + +import java.sql.{Date, Timestamp} + +import org.apache.spark.sql.sources._ + +/** + * Helper methods to find valid filters, and convert spark filters to SQL where clause. + */ +private[pinot] object FilterPushDown { + + /** + * Create SQL 'where clause' from Spark filters. + * + * @param filters Supported spark filters + * @return where clause, or None if filters does not exists + */ + def compileFiltersToSqlWhereClause(filters: Array[Filter]): Option[String] = { + if (filters.isEmpty) { + None + } else { + Option(filters.flatMap(compileFilter).map(filter => s"($filter)").mkString(" AND ")) + } + } + + /** + * Accept only filters that supported in SQL. + * + * @param filters Spark filters that contains valid and/or invalid filters + * @return Supported and unsupported filters + */ + def acceptFilters(filters: Array[Filter]): (Array[Filter], Array[Filter]) = { + filters.partition(isFilterSupported) + } + + private def isFilterSupported(filter: Filter): Boolean = filter match { + case _: EqualTo => true + case _: EqualNullSafe => true + case _: In => true + case _: LessThan => true + case _: LessThanOrEqual => true + case _: GreaterThan => true + case _: GreaterThanOrEqual => true + case _: IsNull => true + case _: IsNotNull => true + case _: StringStartsWith => true + case _: StringEndsWith => true + case _: StringContains => true + case _: Not => true + case _: Or => true + case _: And => true + case _ => false + } + + private def escapeSql(value: String): String = + if (value == null) null else value.replace("'", "''") + + private def compileValue(value: Any): Any = value match { + case stringValue: String => s"'${escapeSql(stringValue)}'" + case timestampValue: Timestamp => "'" + timestampValue + "'" + case dateValue: Date => "'" + dateValue + "'" + case arrayValue: Array[Any] => arrayValue.map(compileValue).mkString(", ") + case _ => value + } + + private def compileFilter(filter: Filter): Option[String] = { + val whereCondition = filter match { + case EqualTo(attr, value) => s"$attr = ${compileValue(value)}" + case EqualNullSafe(attr, value) => + s"NOT ($attr != ${compileValue(value)} OR $attr IS NULL OR " + + s"${compileValue(value)} IS NULL) OR " + + s"($attr IS NULL AND ${compileValue(value)} IS NULL)" + case LessThan(attr, value) => s"$attr < ${compileValue(value)}" + case GreaterThan(attr, value) => s"$attr > ${compileValue(value)}" + case LessThanOrEqual(attr, value) => s"$attr <= ${compileValue(value)}" + case GreaterThanOrEqual(attr, value) => s"$attr >= ${compileValue(value)}" + case IsNull(attr) => s"$attr IS NULL" + case IsNotNull(attr) => s"$attr IS NOT NULL" + case StringStartsWith(attr, value) => s"$attr LIKE '$value%'" + case StringEndsWith(attr, value) => s"$attr LIKE '%$value'" + case StringContains(attr, value) => s"$attr LIKE '%$value%'" + case In(attr, value) if value.isEmpty => + s"CASE WHEN $attr IS NULL THEN NULL ELSE FALSE END" + case In(attr, value) => s"$attr IN (${compileValue(value)})" + case Not(f) => compileFilter(f).map(p => s"NOT ($p)").orNull + case Or(f1, f2) => + val or = Seq(f1, f2).flatMap(compileFilter) + if (or.size == 2) { + or.map(p => s"($p)").mkString(" OR ") + } else { + null + } + case And(f1, f2) => + val and = Seq(f1, f2).flatMap(compileFilter) + if (and.size == 2) { + and.map(p => s"($p)").mkString(" AND ") + } else { + null + } + case _ => null + } + Option(whereCondition) + } + +} diff --git a/pinot-connectors/pinot-spark-3-connector/src/test/resources/log4j2.xml b/pinot-connectors/pinot-spark-3-connector/src/test/resources/log4j2.xml new file mode 100644 index 000000000000..439331f9d7b1 --- /dev/null +++ b/pinot-connectors/pinot-spark-3-connector/src/test/resources/log4j2.xml @@ -0,0 +1,36 @@ + + + + + + + + + + + + + + + + + diff --git a/pinot-connectors/pinot-spark-3-connector/src/test/resources/schema/pinot-schema.json b/pinot-connectors/pinot-spark-3-connector/src/test/resources/schema/pinot-schema.json new file mode 100644 index 000000000000..1cf62b287419 --- /dev/null +++ b/pinot-connectors/pinot-spark-3-connector/src/test/resources/schema/pinot-schema.json @@ -0,0 +1,75 @@ +{ + "schemaName" : "schemaName", + "dimensionFieldSpecs" : [ { + "name" : "stringDim", + "dataType" : "STRING" + }, { + "name" : "intDim", + "dataType" : "INT" + }, { + "name" : "floatDim", + "dataType" : "FLOAT", + "defaultNullValue" : 0.0 + }, { + "name" : "doubleDim", + "dataType" : "DOUBLE" + }, { + "name" : "longDim", + "dataType" : "LONG" + }, { + "name": "boolDim", + "dataType": "BOOLEAN" + }, { + "name" : "stringArrayDim", + "dataType" : "STRING", + "singleValueField" : false + }, { + "name" : "floatArrayDim", + "dataType" : "FLOAT", + "singleValueField" : false + }, { + "name": "boolArrayDim", + "dataType": "BOOLEAN", + "singleValueField": false + } ], + "metricFieldSpecs" : [ { + "name" : "floatMetric", + "dataType" : "FLOAT" + }, { + "name" : "doubleMetric", + "dataType" : "DOUBLE" + }, { + "name" : "longMetric", + "dataType" : "LONG" + }, { + "name" : "intMetric", + "dataType" : "INT", + "defaultNullValue" : 10 + }, { + "name" : "stringMetric", + "dataType" : "STRING" + }, { + "name" : "byteDim", + "dataType" : "BYTES" + } ], + "timeFieldSpec" : { + "incomingGranularitySpec" : { + "name" : "incomingTimeField", + "dataType" : "LONG", + "timeType" : "SECONDS" + }, + "outgoingGranularitySpec" : { + "name" : "outgoingTimeField", + "dataType" : "INT", + "timeType" : "DAYS" + } + }, + "dateTimeFieldSpecs": [ + { + "name": "timestampField", + "dataType": "TIMESTAMP", + "format": "1:MILLISECONDS:EPOCH", + "granularity": "1:SECONDS" + } + ] +} diff --git a/pinot-connectors/pinot-spark-3-connector/src/test/resources/schema/spark-schema.json b/pinot-connectors/pinot-spark-3-connector/src/test/resources/schema/spark-schema.json new file mode 100644 index 000000000000..a8187fcf0fc0 --- /dev/null +++ b/pinot-connectors/pinot-spark-3-connector/src/test/resources/schema/spark-schema.json @@ -0,0 +1,105 @@ +{ + "type" : "struct", + "fields" : [ { + "name" : "floatMetric", + "type" : "float", + "nullable" : true, + "metadata" : { } + }, { + "name" : "doubleMetric", + "type" : "double", + "nullable" : true, + "metadata" : { } + }, { + "name" : "longMetric", + "type" : "long", + "nullable" : true, + "metadata" : { } + }, { + "name" : "intMetric", + "type" : "integer", + "nullable" : true, + "metadata" : { } + }, { + "name" : "stringMetric", + "type" : "string", + "nullable" : true, + "metadata" : { } + }, { + "name" : "stringDim", + "type" : "string", + "nullable" : true, + "metadata" : { } + }, { + "name" : "intDim", + "type" : "integer", + "nullable" : true, + "metadata" : { } + }, { + "name" : "floatDim", + "type" : "float", + "nullable" : true, + "metadata" : { } + }, { + "name" : "doubleDim", + "type" : "double", + "nullable" : true, + "metadata" : { } + }, { + "name" : "longDim", + "type" : "long", + "nullable" : true, + "metadata" : { } + }, { + "name" : "boolDim", + "type" : "boolean", + "nullable" : true, + "metadata" : { } + }, { + "name" : "stringArrayDim", + "type" : { + "type" : "array", + "elementType" : "string", + "containsNull" : true + }, + "nullable" : true, + "metadata" : { } + }, { + "name" : "floatArrayDim", + "type" : { + "type" : "array", + "elementType" : "float", + "containsNull" : true + }, + "nullable" : true, + "metadata" : { } + }, { + "name" : "boolArrayDim", + "type" : { + "type" : "array", + "elementType" : "boolean", + "containsNull" : true + }, + "nullable" : true, + "metadata" : { } + }, { + "name" : "byteDim", + "type" : { + "type" : "array", + "elementType" : "byte", + "containsNull" : true + }, + "nullable" : true, + "metadata" : { } + }, { + "name" : "outgoingTimeField", + "type" : "integer", + "nullable" : true, + "metadata" : { } + }, { + "name" : "timestampField", + "type" : "long", + "nullable" : true, + "metadata" : { } + } ] +} diff --git a/pinot-connectors/pinot-spark-3-connector/src/test/scala/org/apache/pinot/connector/spark/v3/datasource/BaseTest.scala b/pinot-connectors/pinot-spark-3-connector/src/test/scala/org/apache/pinot/connector/spark/v3/datasource/BaseTest.scala new file mode 100644 index 000000000000..30747a9b80be --- /dev/null +++ b/pinot-connectors/pinot-spark-3-connector/src/test/scala/org/apache/pinot/connector/spark/v3/datasource/BaseTest.scala @@ -0,0 +1,24 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.connector.spark.v3.datasource + +import org.scalatest.funsuite.AnyFunSuite +import org.scalatest.matchers.should.Matchers + +trait BaseTest extends AnyFunSuite with Matchers diff --git a/pinot-connectors/pinot-spark-3-connector/src/test/scala/org/apache/pinot/connector/spark/v3/datasource/ExampleSparkPinotConnectorTest.scala b/pinot-connectors/pinot-spark-3-connector/src/test/scala/org/apache/pinot/connector/spark/v3/datasource/ExampleSparkPinotConnectorTest.scala new file mode 100644 index 000000000000..48692ae0f791 --- /dev/null +++ b/pinot-connectors/pinot-spark-3-connector/src/test/scala/org/apache/pinot/connector/spark/v3/datasource/ExampleSparkPinotConnectorTest.scala @@ -0,0 +1,212 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.connector.spark.v3.datasource + +import org.apache.pinot.connector.spark.common.Logging +import org.apache.spark.sql.SparkSession +import org.apache.spark.sql.types.{DataTypes, StructField, StructType} + +/** + * Example object to test connector with all of features. + * To run this class, first of all, + * run pinot locally(https://docs.pinot.apache.org/basics/getting-started/running-pinot-locally) + */ +object ExampleSparkPinotConnectorTest extends Logging { + + def main(args: Array[String]): Unit = { + implicit val spark: SparkSession = SparkSession + .builder() + .appName("spark-pinot-connector-test") + .master("local") + .getOrCreate() + + readOffline() + readHybrid() + readHybridWithSpecificSchema() + readHybridWithFilters() + readHybridViaGrpc() + readRealtimeViaGrpc() + readRealtimeWithFilterViaGrpc() + readHybridWithFiltersViaGrpc() + readRealtimeWithSelectionColumns() + applyJustSomeFilters() + } + + def readOffline()(implicit spark: SparkSession): Unit = { + log.info("## Reading `airlineStats_OFFLINE` table... ##") + val data = spark.read + .format("pinot") + .option("table", "airlineStats") + .option("tableType", "offline") + .load() + + data.show() + } + + def readHybrid()(implicit spark: SparkSession): Unit = { + log.info("## Reading `airlineStats_OFFLINE and airlineStats_REALTIME` tables... ##") + val data = spark.read + .format("pinot") + .option("table", "airlineStats") + .option("tableType", "hybrid") + .load() + + data.show() + } + + def readHybridWithSpecificSchema()(implicit spark: SparkSession): Unit = { + log.info("## Reading `airlineStats_OFFLINE and airlineStats_REALTIME` tables with specific schema... ##") + val schema = StructType( + Seq( + StructField("Distance", DataTypes.IntegerType), + StructField("AirlineID", DataTypes.IntegerType), + StructField("DaysSinceEpoch", DataTypes.IntegerType), + StructField("DestStateName", DataTypes.StringType), + StructField("Origin", DataTypes.StringType), + StructField("Carrier", DataTypes.StringType) + ) + ) + + val data = spark.read + .format("pinot") + .option("table", "airlineStats") + .option("tableType", "HYBRID") + .schema(schema) + .load() + + data.show() + } + + def readOfflineWithFilters()(implicit spark: SparkSession): Unit = { + import spark.implicits._ + log.info("## Reading `airlineStats_OFFLINE` table with filter push down... ##") + val data = spark.read + .format("pinot") + .option("table", "airlineStats") + .option("tableType", "OFFLINE") + .load() + .filter($"AirlineID" === 19805) + .filter($"DestStateName" === "Florida") + .filter($"DaysSinceEpoch".isin(16101, 16084, 16074)) + .filter($"Origin" === "ORD") + + data.show() + } + + def readHybridWithFilters()(implicit spark: SparkSession): Unit = { + import spark.implicits._ + log.info("## Reading `airlineStats_OFFLINE and airlineStats_REALTIME` tables with filter push down... ##") + // should return 1 data, because connector ensure that the overlap + // between realtime and offline segment data is queried exactly once + val data = spark.read + .format("pinot") + .option("table", "airlineStats") + .option("tableType", "hybrid") + .load() + .filter($"AirlineID" === 19805) + .filter($"DestStateName" === "Florida") + .filter($"DaysSinceEpoch".isin(16101, 16084, 16074)) + .filter($"Origin" === "ORD") + + data.show() + } + + def readRealtimeWithSelectionColumns()(implicit spark: SparkSession): Unit = { + import spark.implicits._ + log.info("## Reading `airlineStats_REALTIME` table with column push down... ##") + val data = spark.read + .format("pinot") + .option("table", "airlineStats") + .option("tableType", "realtime") + .load() + .select($"FlightNum", $"Origin", $"DestStateName") + + data.show() + } + + def readHybridViaGrpc()(implicit spark: SparkSession): Unit = { + log.info("## Reading `airlineStats` table... ##") + val data = spark.read + .format("pinot") + .option("table", "airlineStats") + .option("tableType", "hybrid") + .option("useGrpcServer", "true") + .load() + + data.show() + } + + def readRealtimeViaGrpc()(implicit spark: SparkSession): Unit = { + log.info("## Reading `airlineStats_REALTIME` table... ##") + val data = spark.read + .format("pinot") + .option("table", "airlineStats") + .option("tableType", "realtime") + .option("useGrpcServer", "true") + .load() + + data.show() + } + + def readRealtimeWithFilterViaGrpc()(implicit spark: SparkSession): Unit = { + import spark.implicits._ + log.info("## Reading `airlineStats_REALTIME` table... ##") + val data = spark.read + .format("pinot") + .option("table", "airlineStats") + .option("tableType", "realtime") + .option("useGrpcServer", "true") + .load() + .filter($"DestWac" === 5) + .select($"FlightNum", $"Origin", $"DestStateName") + + data.show() + } + + def readHybridWithFiltersViaGrpc()(implicit spark: SparkSession): Unit = { + import spark.implicits._ + log.info("## Reading `airlineStats_OFFLINE` table with filter push down... ##") + val data = spark.read + .format("pinot") + .option("table", "airlineStats") + .option("tableType", "hybrid") + .option("useGrpcServer", "true") + .load() + .filter($"DestStateName" === "Florida") + + data.show() + } + + + def applyJustSomeFilters()(implicit spark: SparkSession): Unit = { + import spark.implicits._ + log.info("## Reading `airlineStats_OFFLINE and airlineStats_REALTIME` tables with filter push down... ##") + val data = spark.read + .format("pinot") + .option("table", "airlineStats") + .option("tableType", "hybrid") + .load() + .filter($"DestStateName" === "Florida") + .filter($"Origin" === "ORD") + .select($"DestStateName", $"Origin", $"Distance", $"AirlineID") + + data.show() + } + +} diff --git a/pinot-connectors/pinot-spark-3-connector/src/test/scala/org/apache/pinot/connector/spark/v3/datasource/PinotScanBuilderTest.scala b/pinot-connectors/pinot-spark-3-connector/src/test/scala/org/apache/pinot/connector/spark/v3/datasource/PinotScanBuilderTest.scala new file mode 100644 index 000000000000..bbcfd6fcffe6 --- /dev/null +++ b/pinot-connectors/pinot-spark-3-connector/src/test/scala/org/apache/pinot/connector/spark/v3/datasource/PinotScanBuilderTest.scala @@ -0,0 +1,51 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.connector.spark.v3.datasource + +import org.apache.pinot.connector.spark.common.PinotDataSourceReadOptions +import org.apache.spark.sql.sources.{Filter, LessThan} +import org.apache.spark.sql.types.{IntegerType, StructField, StructType} + + +class PinotScanBuilderTest extends BaseTest { + test("Builder should build a PinotScan with given filters") { + val optMap = new java.util.HashMap[String, String]() + optMap.put("table", "myTable") + optMap.put("tableType", "REALTIME") + optMap.put("broker", "localhost:7177") + val readOptions = PinotDataSourceReadOptions.from(optMap) + + // create a scan builder with custom schema + val builder = new PinotScanBuilder(readOptions) + builder.pruneColumns(StructType( + Seq(StructField("myCol",IntegerType)) + )) + + // push a filter + val lessThan = LessThan("myCol", 100) + builder.pushFilters(Array[Filter]{lessThan}) + val pushedFilters = builder.pushedFilters() + pushedFilters.length shouldEqual 1 + pushedFilters(0) shouldBe a [LessThan] + + // run the builder + val scan = builder.build() + scan shouldBe a [PinotScan] + } +} diff --git a/pinot-connectors/pinot-spark-3-connector/src/test/scala/org/apache/pinot/connector/spark/v3/datasource/PinotScanTest.scala b/pinot-connectors/pinot-spark-3-connector/src/test/scala/org/apache/pinot/connector/spark/v3/datasource/PinotScanTest.scala new file mode 100644 index 000000000000..717003ba4c9d --- /dev/null +++ b/pinot-connectors/pinot-spark-3-connector/src/test/scala/org/apache/pinot/connector/spark/v3/datasource/PinotScanTest.scala @@ -0,0 +1,51 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.connector.spark.v3.datasource + +import org.apache.pinot.connector.spark.common.PinotDataSourceReadOptions +import org.apache.pinot.connector.spark.common.query.ScanQuery +import org.apache.pinot.spi.config.table.TableType +import org.apache.spark.sql.connector.read.PartitionReaderFactory +import org.apache.spark.sql.types.{IntegerType, StructField, StructType} + +class PinotScanTest extends BaseTest { + test("Scan should build and return a partition reader") { + val optMap = new java.util.HashMap[String, String]() + optMap.put("table", "myTable") + optMap.put("tableType", "REALTIME") + optMap.put("broker", "localhost:7177") + val readOptions = PinotDataSourceReadOptions.from(optMap) + val schema = StructType( + Seq(StructField("myCol", IntegerType)) + ) + val scanQuery = ScanQuery( + "myTable", + Some(TableType.OFFLINE), + "select * from myTable", + "") + + val scan = new PinotScan(scanQuery, schema, readOptions) + val readerFactory = scan.createReaderFactory() + + + + // assert PinotScan creates a PartitionReaderFactory + readerFactory shouldBe a [PartitionReaderFactory] + } +} diff --git a/pinot-connectors/pinot-spark-3-connector/src/test/scala/org/apache/pinot/connector/spark/v3/datasource/TypeConverterTest.scala b/pinot-connectors/pinot-spark-3-connector/src/test/scala/org/apache/pinot/connector/spark/v3/datasource/TypeConverterTest.scala new file mode 100644 index 000000000000..1b4ade0af1b5 --- /dev/null +++ b/pinot-connectors/pinot-spark-3-connector/src/test/scala/org/apache/pinot/connector/spark/v3/datasource/TypeConverterTest.scala @@ -0,0 +1,170 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.connector.spark.v3.datasource + +import org.apache.pinot.common.utils.DataSchema +import org.apache.pinot.common.utils.DataSchema.ColumnDataType +import org.apache.pinot.connector.spark.common.PinotException +import org.apache.pinot.core.common.datatable.DataTableBuilderFactory +import org.apache.pinot.spi.data.Schema +import org.apache.pinot.spi.utils.ByteArray +import org.apache.spark.sql.catalyst.util.ArrayData +import org.apache.spark.sql.types._ +import org.apache.spark.unsafe.types.UTF8String + +import scala.io.Source + +/** + * Test pinot/spark conversions like schema, data table etc. + */ +class TypeConverterTest extends BaseTest { + + test("Pinot DataTable should be converted to Spark InternalRows") { + val columnNames = Array( + "strCol", + "intCol", + "longCol", + "floatCol", + "doubleCol", + "strArrayCol", + "intArrayCol", + "longArrayCol", + "floatArrayCol", + "doubleArrayCol", + "byteType", + "timestampArrayCol", + "timestampCol", + "booleanArrayCol", + "booleanCol", + ) + val columnTypes = Array( + ColumnDataType.STRING, + ColumnDataType.INT, + ColumnDataType.LONG, + ColumnDataType.FLOAT, + ColumnDataType.DOUBLE, + ColumnDataType.STRING_ARRAY, + ColumnDataType.INT_ARRAY, + ColumnDataType.LONG_ARRAY, + ColumnDataType.FLOAT_ARRAY, + ColumnDataType.DOUBLE_ARRAY, + ColumnDataType.BYTES, + ColumnDataType.TIMESTAMP_ARRAY, + ColumnDataType.TIMESTAMP, + ColumnDataType.BOOLEAN_ARRAY, + ColumnDataType.BOOLEAN, + ) + val dataSchema = new DataSchema(columnNames, columnTypes) + + val dataTableBuilder = DataTableBuilderFactory.getDataTableBuilder(dataSchema) + dataTableBuilder.startRow() + dataTableBuilder.setColumn(0, "strValueDim") + dataTableBuilder.setColumn(1, 5) + dataTableBuilder.setColumn(2, 3L) + dataTableBuilder.setColumn(3, 10.05f) + dataTableBuilder.setColumn(4, 2.3d) + dataTableBuilder.setColumn(5, Array[String]("strArr1", "null")) + dataTableBuilder.setColumn(6, Array[Int](1, 2, 0)) + dataTableBuilder.setColumn(7, Array[Long](10L, 0)) + dataTableBuilder.setColumn(8, Array[Float](0, 15.20f)) + dataTableBuilder.setColumn(9, Array[Double](0, 10.3d)) + dataTableBuilder.setColumn(10, new ByteArray("byte_test".getBytes)) + dataTableBuilder.setColumn(11, Array[Long](123L,456L)) + dataTableBuilder.setColumn(12, 123L) + dataTableBuilder.setColumn(13, Array[Int](1,0,1,0)) + dataTableBuilder.setColumn(14, 1) + + dataTableBuilder.finishRow() + val dataTable = dataTableBuilder.build() + + val schema = StructType( + Seq( + StructField("intArrayCol", ArrayType(IntegerType)), + StructField("intCol", IntegerType), + StructField("doubleArrayCol", ArrayType(DoubleType)), + StructField("doubleCol", DoubleType), + StructField("strArrayCol", ArrayType(StringType)), + StructField("longCol", LongType), + StructField("longArrayCol", ArrayType(LongType)), + StructField("strCol", StringType), + StructField("floatArrayCol", ArrayType(FloatType)), + StructField("floatCol", FloatType), + StructField("byteType", ArrayType(ByteType)), + StructField("timestampArrayCol", ArrayType(LongType)), + StructField("timestampCol", LongType), + StructField("booleanArrayCol", ArrayType(BooleanType)), + StructField("booleanCol", BooleanType), + ) + ) + + val result = TypeConverter.pinotDataTableToInternalRows(dataTable, schema).head + result.getArray(0) shouldEqual ArrayData.toArrayData(Seq(1, 2, 0)) + result.getInt(1) shouldEqual 5 + result.getArray(2) shouldEqual ArrayData.toArrayData(Seq(0d, 10.3d)) + result.getDouble(3) shouldEqual 2.3d + result.getArray(4) shouldEqual ArrayData.toArrayData( + Seq("strArr1", "null").map(UTF8String.fromString) + ) + result.getLong(5) shouldEqual 3L + result.getArray(6) shouldEqual ArrayData.toArrayData(Seq(10L, 0L)) + result.getString(7) shouldEqual "strValueDim" + result.getArray(8) shouldEqual ArrayData.toArrayData(Seq(0f, 15.20f)) + result.getFloat(9) shouldEqual 10.05f + result.getArray(10) shouldEqual ArrayData.toArrayData("byte_test".getBytes) + result.getArray(11) shouldEqual ArrayData.toArrayData(Seq(123L,456L)) + result.getLong(12) shouldEqual 123L + result.getArray(13) shouldEqual ArrayData.toArrayData(Seq(true, false, true, false)) + result.getBoolean(14) shouldEqual true + } + + test("Method should throw field not found exception while converting pinot data table") { + val columnNames = Array("strCol", "intCol") + val columnTypes = Array(ColumnDataType.STRING, ColumnDataType.INT) + val dataSchema = new DataSchema(columnNames, columnTypes) + + val dataTableBuilder = DataTableBuilderFactory.getDataTableBuilder(dataSchema) + dataTableBuilder.startRow() + dataTableBuilder.setColumn(0, "strValueDim") + dataTableBuilder.setColumn(1, 5) + dataTableBuilder.finishRow() + val dataTable = dataTableBuilder.build() + + val schema = StructType( + Seq( + StructField("strCol", StringType), + StructField("intCol", IntegerType), + StructField("longCol", LongType) + ) + ) + + val exception = intercept[PinotException] { + TypeConverter.pinotDataTableToInternalRows(dataTable, schema) + } + + exception.getMessage shouldEqual s"'longCol' not found in Pinot server response" + } + + test("Pinot schema should be converted to spark schema") { + val pinotSchemaAsString = Source.fromResource("schema/pinot-schema.json").mkString + val resultSchema = TypeConverter.pinotSchemaToSparkSchema(Schema.fromString(pinotSchemaAsString)) + val sparkSchemaAsString = Source.fromResource("schema/spark-schema.json").mkString + val sparkSchema = DataType.fromJson(sparkSchemaAsString).asInstanceOf[StructType] + resultSchema.fields should contain theSameElementsAs sparkSchema.fields + } +} diff --git a/pinot-connectors/pinot-spark-3-connector/src/test/scala/org/apache/pinot/connector/spark/v3/datasource/query/FilterPushDownTest.scala b/pinot-connectors/pinot-spark-3-connector/src/test/scala/org/apache/pinot/connector/spark/v3/datasource/query/FilterPushDownTest.scala new file mode 100644 index 000000000000..6202257b9b1f --- /dev/null +++ b/pinot-connectors/pinot-spark-3-connector/src/test/scala/org/apache/pinot/connector/spark/v3/datasource/query/FilterPushDownTest.scala @@ -0,0 +1,75 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.connector.spark.v3.datasource.query + +import java.sql.{Date, Timestamp} + +import org.apache.pinot.connector.spark.v3.datasource.BaseTest +import org.apache.spark.sql.sources._ + +/** + * Test filter conversions => Spark filter to SQL where clause + */ +class FilterPushDownTest extends BaseTest { + + private val filters: Array[Filter] = Array( + EqualTo("attr1", 1), + In("attr2", Array("1", "2", "'5'")), + LessThan("attr3", 1), + LessThanOrEqual("attr4", 3), + GreaterThan("attr5", 10), + GreaterThanOrEqual("attr6", 15), + Not(EqualTo("attr7", "1")), + And(LessThan("attr8", 10), LessThanOrEqual("attr9", 3)), + Or(EqualTo("attr10", "hello"), GreaterThanOrEqual("attr11", 13)), + StringContains("attr12", "pinot"), + In("attr13", Array(10, 20)), + EqualNullSafe("attr20", "123"), + IsNull("attr14"), + IsNotNull("attr15"), + StringStartsWith("attr16", "pinot1"), + StringEndsWith("attr17", "pinot2"), + EqualTo("attr18", Timestamp.valueOf("2020-01-01 00:00:15")), + LessThan("attr19", Date.valueOf("2020-01-01")), + EqualTo("attr21", Seq(1, 2)), + EqualTo("attr22", 10.5d) + ) + + test("Unsupported filters should be filtered") { + val (accepted, postScan) = FilterPushDown.acceptFilters(filters) + + accepted should contain theSameElementsAs filters + postScan should contain theSameElementsAs Seq.empty + } + + test("SQL query should be created from spark filters") { + val whereClause = FilterPushDown.compileFiltersToSqlWhereClause(filters) + val expectedOutput = + s"(attr1 = 1) AND (attr2 IN ('1', '2', '''5''')) AND (attr3 < 1) AND (attr4 <= 3) AND (attr5 > 10) AND " + + s"(attr6 >= 15) AND (NOT (attr7 = '1')) AND ((attr8 < 10) AND (attr9 <= 3)) AND " + + s"((attr10 = 'hello') OR (attr11 >= 13)) AND (attr12 LIKE '%pinot%') AND (attr13 IN (10, 20)) AND " + + s"(NOT (attr20 != '123' OR attr20 IS NULL OR '123' IS NULL) OR (attr20 IS NULL AND '123' IS NULL)) AND " + + s"(attr14 IS NULL) AND (attr15 IS NOT NULL) AND (attr16 LIKE 'pinot1%') AND (attr17 LIKE '%pinot2') AND " + + s"(attr18 = '2020-01-01 00:00:15.0') AND (attr19 < '2020-01-01') AND (attr21 = List(1, 2)) AND " + + s"(attr22 = 10.5)" + + whereClause.get shouldEqual expectedOutput + } + +} diff --git a/pinot-connectors/pom.xml b/pinot-connectors/pom.xml index 75a40c97ba20..4d1cc66c2dbc 100644 --- a/pinot-connectors/pom.xml +++ b/pinot-connectors/pom.xml @@ -40,6 +40,7 @@ pinot-spark-common pinot-spark-2-connector + pinot-spark-3-connector pinot-flink-connector