Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Sedona 173 delta lake tests #696

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
754 changes: 754 additions & 0 deletions binder/ApacheSedonaDeltaLake.ipynb

Large diffs are not rendered by default.

94 changes: 94 additions & 0 deletions example/delta/src/test/scala/org/apache/sedona/delta/Base.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.sedona.delta


import org.apache.sedona.core.serde.SedonaKryoRegistrator
import org.apache.sedona.sql.utils.SedonaSQLRegistrator
import org.apache.spark.serializer.KryoSerializer
import org.apache.spark.sql.{DataFrame, SparkSession}
import org.locationtech.jts.geom.{Coordinate, Geometry, GeometryFactory}
import org.locationtech.jts.io.WKTReader
import org.scalatest.matchers.should.Matchers
import org.scalatest.{BeforeAndAfterAll, BeforeAndAfterEach, FunSpec, GivenWhenThen}

import java.io.File
import java.nio.file.Paths
import scala.math.random
import scala.reflect.io.Directory


class Base extends FunSpec with BeforeAndAfterAll with GivenWhenThen with BeforeAndAfterEach with Matchers {

private val warehouseLocation: String = System.getProperty("user.dir") + "/target/"
protected val geometryFactory = new GeometryFactory()

val spark: SparkSession = SparkSession
.builder
.config("spark.serializer", classOf[KryoSerializer].getName)
.config("spark.kryo.registrator", classOf[SedonaKryoRegistrator].getName)
.master("local[*]")
.config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension")
.config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog")
.config("spark.sql.warehouse.dir", warehouseLocation)
.getOrCreate()

SedonaSQLRegistrator.registerAll(spark)

spark.sparkContext.setLogLevel("ERROR")

protected def createTableAsSelect(df: DataFrame, tableName: String): Unit = {
df.createOrReplaceTempView("df")

spark.sql(
s"CREATE TABLE IF NOT EXISTS $tableName using delta LOCATION '$temporarySavePath/$tableName' AS SELECT * FROM df"
)
}
protected def loadDeltaTable(tableName: String): DataFrame = {
spark.sql(s"SELECT * FROM delta.`$temporarySavePath/$tableName`")
}

protected val temporarySavePath: String = Paths.get(
System.getProperty("user.dir"), "delta/src/test/scala/tmp").toString

override def afterEach(): Unit = {
val tmpDirectory = new Directory(new File(temporarySavePath))
tmpDirectory.deleteRecursively()
new File(temporarySavePath).delete()
}

protected def createPoint(x: Double, y: Double): Geometry =
geometryFactory.createPoint(new Coordinate(x, y)).asInstanceOf[Geometry]

protected def produceDeltaPath(tableName: String): String =
s"delta.`${temporarySavePath}/${tableName}`"

protected def produceDeltaTargetPath(tableName: String): String =
s"${temporarySavePath}/${tableName}"

protected def createSpatialPointDataFrame(numberOfPoints: Int): DataFrame = {
import spark.implicits._
(1 to numberOfPoints).map(index =>
(index, createPoint(random.floatValue() * 20.0, random.floatValue() * 20.0))
).toDF("index", "geom")
}

protected val wktReader: WKTReader = new WKTReader()
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.sedona.delta.api

import org.apache.sedona.delta.Base
import org.apache.spark.sql.functions.expr


class TestCreateTableCommand extends Base {

describe("should create table using delta with geospatial data") {
import spark.implicits._

it("should save geospatial columns") {
Given("geospatial data")
val gdf = createSpatialPointDataFrame(20)
val tableName = "spatial_df"

When("saving to delta lake format")
createTableAsSelect(gdf, tableName)

Then("geospatial columns should be in the delta table")
val loadedTable = loadDeltaTable(tableName)

And("count should be as initial")
loadedTable.count shouldBe gdf.count

And("input dataframe should be the same as the saved one")
val numberOfValidRecords = loadedTable
.alias("left").join(gdf.alias("right"), $"left.index" === $"right.index")
.withColumn("eq", expr("left.geom == right.geom"))
.where("eq == true")
.count

numberOfValidRecords shouldBe gdf.count

}

it("should save non geospatial data only") {
Given("data frame with non geospatial data")
val df = Seq(
(1, "text1"),
(2, "text2")
).toDF("index", "name")
val targetTableName = "non_geospatial"

When("saving as delta table")
createTableAsSelect(df, targetTableName)

Then("result table should be the same as input table")
val loadedTable = loadDeltaTable(targetTableName)

loadedTable.count shouldBe df.count

}

it("should create geospatial table based on transformed table") {
Given("geometry dataframe")
val gdf = createSpatialPointDataFrame(10)
.selectExpr(
"index", "ST_GeoHash(geom, 6) AS geohash"
)

val tableName = "transformed"

When("transforming geometry data frame and save it to delta lake")
createTableAsSelect(gdf, tableName)

Then("data should be properly saved")
val loadedDf = loadDeltaTable(tableName)

And("data should have initial count")
loadedDf.count shouldBe gdf.count

And("point")
val numberOfInEqualElements = loadedDf
.alias("left").join(gdf.alias("right"), $"left.index" === $"right.index")
.selectExpr("left.geohash == right.geohash AS eq")
.where("eq == false")
.count

numberOfInEqualElements shouldBe 0
}
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.sedona.delta.api

import io.delta.tables.DeltaTable
import org.apache.sedona.delta.Base
import org.apache.spark.sql.sedona_sql.UDT.GeometryUDT

class TestDDLDeltaLake extends Base {

describe("ddl lifecycle"){
it("should be able to create delta table based on delta api"){
Given("delta table definition")
val deltaTableName = "delta_table"

val deltaTable = DeltaTable.create(spark)
.location(produceDeltaTargetPath(deltaTableName))
.addColumn("identifier", "BIGINT")
.addColumn("geom", GeometryUDT)


When("executing query")
deltaTable.execute()

Then("delta table with geometry type should be created")
val savedDeltaTable = loadDeltaTable(deltaTableName)
val fields = savedDeltaTable.schema.map(
field => Field(field.dataType.simpleString, field.name)
)

fields should contain theSameElementsAs Seq(
Field("geometry", "geom"), Field("bigint", "identifier")
)

}

}
protected case class Field(tp: String, name: String)
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,131 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.sedona.delta.api

import org.apache.sedona.delta.Base


class TestDeleteCommand extends Base {

import spark.implicits._

describe("testing sql delete statement lifecycle for delta lake with geospatial"){
it("should remove elements based on geospatial conditions") {
Given("geospatial delta lake table")
val deltaTableName = "geospatial"
val expectedNumberOfElements = 100

val pointsTable = createSpatialPointDataFrame(expectedNumberOfElements)
.unionAll(
Seq((1, createPoint(21.0, 52.0))).toDF("index", "geom")
)

createTableAsSelect(pointsTable, deltaTableName)

When("removing records based on geospatial predicate")
spark.sql(
s"DELETE FROM ${produceDeltaPath(deltaTableName)} WHERE ST_X(geom) == 21.0"
)

Then("target table should have appropriate number of records within it")
loadDeltaTable(deltaTableName).count shouldBe expectedNumberOfElements

}

it("should remove elements on geospatial data based on non geospatial conditions"){
Given("geospatial data frame")
val deltaTableName = "geospatial_non_geospatial_predicate"
val expectedNumberOfElements = 100
val indexToRemove = 100000

val pointsTable = createSpatialPointDataFrame(expectedNumberOfElements)
.unionAll(
Seq((indexToRemove, createPoint(21.0, 52.0))).toDF("index", "geom")
)

createTableAsSelect(pointsTable, deltaTableName)

When("deleting elements from delta table based on non geospatial predicate")
spark.sql(
s"DELETE FROM ${produceDeltaPath(deltaTableName)} WHERE index == $indexToRemove"
)

Then("count in delta table should be as expected")
loadDeltaTable(deltaTableName).count shouldBe expectedNumberOfElements
}

it("should remove elements based on geospatial data") {
Given("geospatial delta table")
val geospatialDataFrame = Seq(
(1, wktReader.read("POINT(21.00 52.00)")),
(2, wktReader.read("POINT(21.00 20.00)")),
(3, wktReader.read("POINT(21.00 23.00)")),
(4, wktReader.read("POINT(21.00 24.00)"))
).toDF("index", "geom")

val expectedElementsCount = 3
val deltaTableName = "geospatial_equality"

createTableAsSelect(geospatialDataFrame, deltaTableName)

When("removing elements based on geometry equality")
spark.sql(
s"DELETE FROM ${produceDeltaPath(deltaTableName)} WHERE geom = ST_Point(21.00, 52.00)"
)

Then("number of elements should be as expected")
loadDeltaTable(deltaTableName).count shouldBe expectedElementsCount
}

it("should remove elements based on geospatial predicate") {
Given("geospatial delta table")
val gdf = Seq(
(1, wktReader.read("POINT(21.064945 52.215713)")),
(2, wktReader.read("POINT(15.241143 52.092268)")),
(3, wktReader.read("POINT(21.310270 54.052040)")),
(4, wktReader.read("POINT(16.282355 53.791566)")),
(5, wktReader.read("POINT(8.485754 51.581370)"))
).toDF("index", "geom")
val deltaTableName = "geospatial_predicate"

createTableAsSelect(gdf, deltaTableName)

And("extent to clip the data")
val extentOfPoland = "POLYGON((14.368894 54.793480, 23.654518 54.903482, 24.074855 49.028561, 14.024982 49.749864, 14.368894 54.793480))"

When("removing elements based on geospatial predicate")
spark.sql(
s"DELETE FROM ${produceDeltaPath(deltaTableName)} WHERE NOT ST_Within(geom, ST_GeomFromText('$extentOfPoland'))"
)

Then("number of records should match expected")
val deltaTable = loadDeltaTable(deltaTableName)

deltaTable.count shouldBe 4

And("ids set should be as given")
deltaTable.select("index").as[Int].collect().toList should contain theSameElementsAs List(
1, 2, 3, 4
)

}

}
}
Loading