-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathApplication.scala
119 lines (100 loc) · 3.45 KB
/
Application.scala
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
import org.apache.spark.SparkConf
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.execution.datasources.v2.*
import org.apache.spark.sql.execution.datasources.v2.jdbc.*
import java.sql.{Connection, DriverManager}
case class DB(name: String, url: String, driver: String) {
def buildSparkConf(): SparkConf = new SparkConf()
.set(s"spark.sql.catalog.$name", classOf[JDBCTableCatalog].getName)
.set(s"spark.sql.catalog.$name.url", url)
.set(s"spark.sql.catalog.$name.driver", driver)
.set(s"spark.sql.catalog.$name.pushDownAggregate", "true")
.set(s"spark.sql.catalog.$name.pushDownLimit", "true")
}
@main
def main(): Unit = {
System.setProperty("spark.testing", "true")
val h2PostgresWithInit = (name: String, file: String) =>
s"jdbc:h2:mem:$name;DB_CLOSE_DELAY=-1;" +
s"MODE=PostgreSQL;DATABASE_TO_LOWER=TRUE;DEFAULT_NULL_ORDERING=HIGH;" +
s"INIT=RUNSCRIPT FROM 'classpath:sql/$file'"
val db1 = DB(
name = "db1",
url = h2PostgresWithInit("db1", "create-person-table.sql"),
driver = "org.h2.Driver"
)
val db2 = DB(
name = "db2",
url = h2PostgresWithInit("db2", "create-todos-table.sql"),
driver = "org.h2.Driver"
)
// Force RUNSCRIPT to be executed on each DB
val connDb1 = DriverManager.getConnection(db1.url)
val connDb2 = DriverManager.getConnection(db2.url)
// Setup fake Hadoop environment
val tmpdir = System.getProperty("java.io.tmpdir")
System.setProperty("hadoop.home.dir", tmpdir)
// Create spark session
val spark = SparkSession
.builder()
.appName("JDBC Federated Example")
.master("local[*]")
.config(db1.buildSparkConf())
.config(db2.buildSparkConf())
.config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
.config("spark.kryo.unsafe", "true")
// .config("spark.sql.codegen.comments", "true")
.config("spark.sql.adaptive.enabled", "true")
.config("spark.sql.cbo.enabled", "true")
.config("spark.sql.cbo.joinReorder.dp.star.filter", "true")
.config("spark.sql.cbo.joinReorder.enabled", "true")
.config("spark.sql.cbo.planStats.enabled", "true")
.config("spark.sql.cbo.starSchemaDetection", "true")
.config("spark.sql.ui.explainMode", "extended")
.getOrCreate()
spark.sql("SHOW TABLES IN db1.public").show()
spark.sql("SHOW TABLES IN db2.public").show()
val query1 =
"""
| SELECT p.id, p.name, t.id, t.title
| FROM db1.public.person p
| JOIN db2.public.todos t
| ON p.id = t.person_id
|""".stripMargin
// Query person joined with todos
val df1 = spark.sql(query1)
spark.time(df1.show())
val query2 =
"""
| SELECT p.id, p.name, t.id, t.title
| FROM db1.public.person p
| JOIN db2.public.todos t
| ON p.id = t.person_id
| WHERE p.id = 1
|""".stripMargin
val df2 = spark.sql(query2)
spark.time(df2.show())
val query3 =
"""
| SELECT p.id, p.name, t.id, t.title
| FROM db1.public.person p
| JOIN db2.public.todos t
| ON p.id = t.person_id
| WHERE p.id = 2
| LIMIT 1
|""".stripMargin
val df3 = spark.sql(query3)
spark.time(df3.show())
val query4 =
"""
| SELECT p.id, p.name, t.id, t.title
| FROM db1.public.person p
| JOIN db2.public.todos t
| ON p.id = t.person_id
| WHERE p.id = 2
|""".stripMargin
val df4 = spark.sql(query4)
spark.time(df4.show())
println("Press any button to stop...")
scala.io.StdIn.readLine()
}