From 17679a1d35392d0b6d456a805271d6d90ff5bd4e Mon Sep 17 00:00:00 2001 From: Harish Butani Date: Tue, 6 Sep 2016 09:13:41 -0700 Subject: [PATCH] in metadata qualify tableNames, so that star schema tables, druid DS, and base tables can be in multiple DBs --- .../apache/spark/sql/CachedTablePattern.scala | 8 +- .../sparklinedata/SparklineDataContext.scala | 28 +++- .../sparklinedata/druid/DefaultSource.scala | 15 +- .../druid/metadata/StarSchemaInfo.scala | 21 +++ .../sparklinedata/druid/client/BaseTest.scala | 101 +++++++------ .../druid/client/MultiDBTest.scala | 140 ++++++++++++++++++ .../druid/client/SelectQueryTest.scala | 2 +- .../druid/client/StarSchemaBaseTest.scala | 2 +- 8 files changed, 258 insertions(+), 59 deletions(-) create mode 100644 src/test/scala/org/sparklinedata/druid/client/MultiDBTest.scala diff --git a/src/main/scala/org/apache/spark/sql/CachedTablePattern.scala b/src/main/scala/org/apache/spark/sql/CachedTablePattern.scala index f8952d1..4a32365 100644 --- a/src/main/scala/org/apache/spark/sql/CachedTablePattern.scala +++ b/src/main/scala/org/apache/spark/sql/CachedTablePattern.scala @@ -20,11 +20,11 @@ package org.apache.spark.sql import java.util.concurrent.locks.ReentrantReadWriteLock import org.apache.spark.sql.catalyst.expressions._ -import org.apache.spark.sql.catalyst.plans.logical.{Subquery, Filter, Project, LogicalPlan} +import org.apache.spark.sql.catalyst.plans.logical.{Filter, LogicalPlan, Project, Subquery} import org.apache.spark.sql.execution.columnar.InMemoryRelation - -import org.apache.spark.sql.catalyst.planning.PhysicalOperation.{ReturnType} +import org.apache.spark.sql.catalyst.planning.PhysicalOperation.ReturnType import org.apache.spark.sql.execution.SparkPlan +import org.apache.spark.sql.hive.sparklinedata.SparklineDataContext import org.apache.spark.sql.sources.druid.DruidPlanner import scala.collection.mutable.{Map => mMap} @@ -77,7 +77,7 @@ class CachedTablePattern(val sqlContext : SQLContext) extends PredicateHelper { l match { case l if l.isEmpty => Array() case l if l.size == 1 && l(0).trim == "" => Array() - case _ => l.toArray + case _ => l.map(SparklineDataContext.qualifyWithDefault(sqlContext, _)).toArray } } diff --git a/src/main/scala/org/apache/spark/sql/hive/sparklinedata/SparklineDataContext.scala b/src/main/scala/org/apache/spark/sql/hive/sparklinedata/SparklineDataContext.scala index 62a5109..254e922 100644 --- a/src/main/scala/org/apache/spark/sql/hive/sparklinedata/SparklineDataContext.scala +++ b/src/main/scala/org/apache/spark/sql/hive/sparklinedata/SparklineDataContext.scala @@ -24,7 +24,7 @@ import org.apache.spark.sql.SQLContext import org.apache.spark.sql.catalyst.analysis.OverrideCatalog import org.apache.spark.sql.catalyst.optimizer.Optimizer import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan -import org.apache.spark.sql.catalyst.{ParserDialect, TableIdentifier} +import org.apache.spark.sql.catalyst.{ParserDialect, SqlParser, TableIdentifier} import org.apache.spark.sql.execution.CacheManager import org.apache.spark.sql.execution.ui.SQLListener import org.apache.spark.sql.hive.client.{ClientInterface, ClientWrapper} @@ -93,6 +93,30 @@ class SparklineDataContext( new SparklineMetastoreCatalog(metadataHive, this) with OverrideCatalog override protected[sql] lazy val optimizer: Optimizer = new DruidLogicalOptimizer(conf) + + def currentDB = catalog.currentDB +} + +object SparklineDataContext { + + def qualifiedName(sqlContext : SQLContext, + tableName : String) : String = { + + var tId = SqlParser.parseTableIdentifier(tableName) + + if (!tId.database.isDefined) { + tId = tId.copy(database = Some(sqlContext.asInstanceOf[SparklineDataContext].currentDB)) + } + s"${tId.database.get}.${tId.table}" + } + + def qualifyWithDefault(sqlContext : SQLContext, + tableName : String) : String = { + + var tId = SqlParser.parseTableIdentifier(tableName) + s"${tId.database.getOrElse("default")}.${tId.table}" + } + } class SparklineMetastoreCatalog(client: ClientInterface, hive: HiveContext) extends @@ -122,4 +146,6 @@ class SparklineMetastoreCatalog(client: ClientInterface, hive: HiveContext) exte // case LogicalRelation(DruidRelation(info, _), _) => info // }.toSeq } + + def currentDB : String = client.currentDatabase } diff --git a/src/main/scala/org/sparklinedata/druid/DefaultSource.scala b/src/main/scala/org/sparklinedata/druid/DefaultSource.scala index ecd074f..01e75e9 100644 --- a/src/main/scala/org/sparklinedata/druid/DefaultSource.scala +++ b/src/main/scala/org/sparklinedata/druid/DefaultSource.scala @@ -19,6 +19,7 @@ package org.sparklinedata.druid import org.apache.spark.Logging import org.apache.spark.sql.SQLContext +import org.apache.spark.sql.hive.sparklinedata.SparklineDataContext import org.apache.spark.sql.sources.{BaseRelation, RelationProvider} import org.json4s._ import org.json4s.jackson.JsonMethods._ @@ -33,11 +34,13 @@ class DefaultSource extends RelationProvider with Logging { import Utils.jsonFormat - val sourceDFName = parameters.getOrElse(SOURCE_DF_PARAM, + var sourceDFName = parameters.getOrElse(SOURCE_DF_PARAM, throw new DruidDataSourceException( s"'$SOURCE_DF_PARAM' must be specified for Druid DataSource") ) + sourceDFName = SparklineDataContext.qualifiedName(sqlContext, sourceDFName) + val sourceDF = sqlContext.table(sourceDFName) val dsName: String = parameters.getOrElse(DRUID_DS_PARAM, @@ -71,11 +74,11 @@ class DefaultSource extends RelationProvider with Logging { val druidHost = parameters.get(DRUID_HOST_PARAM).getOrElse(DEFAULT_DRUID_HOST) - val starSchemaInfo = - parameters.get(STAR_SCHEMA_INFO_PARAM).map(parse(_).extract[StarSchemaInfo]).orElse( - throw new DruidDataSourceException( - s"'$STAR_SCHEMA_INFO_PARAM' must be specified for Druid DataSource") - ).get + var starSchemaInfo = + parameters.get(STAR_SCHEMA_INFO_PARAM).map(parse(_).extract[StarSchemaInfo]). + getOrElse(StarSchemaInfo(sourceDFName)) + + starSchemaInfo = StarSchemaInfo.qualifyTableNames(sqlContext, starSchemaInfo) val ss = StarSchema(sourceDFName, starSchemaInfo)(sqlContext) if (ss.isLeft) { diff --git a/src/main/scala/org/sparklinedata/druid/metadata/StarSchemaInfo.scala b/src/main/scala/org/sparklinedata/druid/metadata/StarSchemaInfo.scala index cf3ef30..8841768 100644 --- a/src/main/scala/org/sparklinedata/druid/metadata/StarSchemaInfo.scala +++ b/src/main/scala/org/sparklinedata/druid/metadata/StarSchemaInfo.scala @@ -19,6 +19,7 @@ package org.sparklinedata.druid.metadata import org.apache.spark.sql.SQLContext import org.apache.spark.sql.catalyst.expressions.{AttributeReference, Expression} +import org.apache.spark.sql.hive.sparklinedata.SparklineDataContext import scala.annotation.tailrec import scala.collection.mutable.ArrayBuffer @@ -32,6 +33,17 @@ import scala.collection.mutable.ArrayBuffer */ case class StarSchemaInfo(factTable : String, relations : StarRelationInfo*) +object StarSchemaInfo { + + def qualifyTableNames(sqlContext : SQLContext, + sSI : StarSchemaInfo) : StarSchemaInfo = { + StarSchemaInfo( + SparklineDataContext.qualifiedName(sqlContext, sSI.factTable), + sSI.relations.map(StarRelationInfo.qualifyTableNames(sqlContext, _)):_* + ) + } +} + /** * Represents how 2 tables in a StarSchema are related. * @param leftTable @@ -62,6 +74,15 @@ object StarRelationInfo { new StarRelationInfo(leftTable, rightTable, FunctionalDependencyType.ManyToOne, joinCondition.map(t => EqualityCondition(t._1, t._2))) + + def qualifyTableNames(sqlContext : SQLContext, + sRI : StarRelationInfo) : StarRelationInfo = { + sRI.copy( + leftTable = SparklineDataContext.qualifiedName(sqlContext, sRI.leftTable), + rightTable = SparklineDataContext.qualifiedName(sqlContext, sRI.rightTable) + ) + } + } case class EqualityCondition(leftAttribute : String, rightAttribute : String) diff --git a/src/test/scala/org/sparklinedata/druid/client/BaseTest.scala b/src/test/scala/org/sparklinedata/druid/client/BaseTest.scala index 1017f14..06516d6 100644 --- a/src/test/scala/org/sparklinedata/druid/client/BaseTest.scala +++ b/src/test/scala/org/sparklinedata/druid/client/BaseTest.scala @@ -82,21 +82,22 @@ abstract class BaseTest extends fixture.FunSuite with DruidQueryChecks with | } """.stripMargin.replace('\n', ' ') - val starSchema = - """ + def starSchema(factDB : String = "default", + dimDB : String = "default") = + s""" |{ - | "factTable" : "lineitem", + | "factTable" : "$factDB.lineitem", | "relations" : [ { - | "leftTable" : "lineitem", - | "rightTable" : "orders", + | "leftTable" : "$factDB.lineitem", + | "rightTable" : "$dimDB.orders", | "relationType" : "n-1", | "joinCondition" : [ { | "leftAttribute" : "l_orderkey", | "rightAttribute" : "o_orderkey" | } ] | }, { - | "leftTable" : "lineitem", - | "rightTable" : "partsupp", + | "leftTable" : "$factDB.lineitem", + | "rightTable" : "$dimDB.partsupp", | "relationType" : "n-1", | "joinCondition" : [ { | "leftAttribute" : "l_partkey", @@ -106,56 +107,56 @@ abstract class BaseTest extends fixture.FunSuite with DruidQueryChecks with | "rightAttribute" : "ps_suppkey" | } ] | }, { - | "leftTable" : "partsupp", - | "rightTable" : "part", + | "leftTable" : "$dimDB.partsupp", + | "rightTable" : "$dimDB.part", | "relationType" : "n-1", | "joinCondition" : [ { | "leftAttribute" : "ps_partkey", | "rightAttribute" : "p_partkey" | } ] | }, { - | "leftTable" : "partsupp", - | "rightTable" : "supplier", + | "leftTable" : "$dimDB.partsupp", + | "rightTable" : "$dimDB.supplier", | "relationType" : "n-1", | "joinCondition" : [ { | "leftAttribute" : "ps_suppkey", | "rightAttribute" : "s_suppkey" | } ] | }, { - | "leftTable" : "orders", - | "rightTable" : "customer", + | "leftTable" : "$dimDB.orders", + | "rightTable" : "$dimDB.customer", | "relationType" : "n-1", | "joinCondition" : [ { | "leftAttribute" : "o_custkey", | "rightAttribute" : "c_custkey" | } ] | }, { - | "leftTable" : "customer", - | "rightTable" : "custnation", + | "leftTable" : "$dimDB.customer", + | "rightTable" : "$dimDB.custnation", | "relationType" : "n-1", | "joinCondition" : [ { | "leftAttribute" : "c_nationkey", | "rightAttribute" : "cn_nationkey" | } ] | }, { - | "leftTable" : "custnation", - | "rightTable" : "custregion", + | "leftTable" : "$dimDB.custnation", + | "rightTable" : "$dimDB.custregion", | "relationType" : "n-1", | "joinCondition" : [ { | "leftAttribute" : "cn_regionkey", | "rightAttribute" : "cr_regionkey" | } ] | }, { - | "leftTable" : "supplier", - | "rightTable" : "suppnation", + | "leftTable" : "$dimDB.supplier", + | "rightTable" : "$dimDB.suppnation", | "relationType" : "n-1", | "joinCondition" : [ { | "leftAttribute" : "s_nationkey", | "rightAttribute" : "sn_nationkey" | } ] | }, { - | "leftTable" : "suppnation", - | "rightTable" : "suppregion", + | "leftTable" : "$dimDB.suppnation", + | "rightTable" : "$dimDB.suppregion", | "relationType" : "n-1", | "joinCondition" : [ { | "leftAttribute" : "sn_regionkey", @@ -165,19 +166,8 @@ abstract class BaseTest extends fixture.FunSuite with DruidQueryChecks with |} """.stripMargin.replace('\n', ' ') - override def beforeAll() = { - - System.setProperty("user.timezone", "UTC") - TimeZone.setDefault(TimeZone.getTimeZone("UTC")) - DateTimeZone.setDefault(DateTimeZone.forID("UTC")) - TestHive.setConf(DruidPlanner.TZ_ID.key, "UTC") - - TestHive.sparkContext.setLogLevel("INFO") - - register(TestHive) - // DruidPlanner(TestHive) - - val cT = s"""CREATE TABLE if not exists orderLineItemPartSupplierBase(o_orderkey integer, + val olFlatCreateTable = + s"""CREATE TABLE if not exists orderLineItemPartSupplierBase(o_orderkey integer, o_custkey integer, o_orderstatus string, o_totalprice double, o_orderdate string, o_orderpriority string, o_clerk string, @@ -200,19 +190,12 @@ abstract class BaseTest extends fixture.FunSuite with DruidQueryChecks with OPTIONS (path "src/test/resources/tpch/datascale1/orderLineItemPartSupplierCustomer.small", header "false", delimiter "|")""".stripMargin - println(cT) - sql(cT) - - TestHive.table("orderLineItemPartSupplierBase").cache() - - TestHive.setConf(DruidPlanner.SPARKLINEDATA_CACHE_TABLES_TOCHECK.key, - "orderLineItemPartSupplierBase") - - // sql("select * from orderLineItemPartSupplierBase limit 10").show(10) - - val cTOlap = s"""CREATE TABLE if not exists orderLineItemPartSupplier + def olDruidDS(db : String = "default", + table : String = "orderLineItemPartSupplierBase", + dsName : String = "orderLineItemPartSupplier") = + s"""CREATE TABLE if not exists $dsName USING org.sparklinedata.druid - OPTIONS (sourceDataframe "orderLineItemPartSupplierBase", + OPTIONS (sourceDataframe "$db.$table", timeDimensionColumn "l_shipdate", druidDatasource "tpch", druidHost "localhost", @@ -223,6 +206,32 @@ abstract class BaseTest extends fixture.FunSuite with DruidQueryChecks with functionalDependencies '$functionalDependencies', starSchema '$flatStarSchema')""".stripMargin + override def beforeAll() = { + + System.setProperty("user.timezone", "UTC") + TimeZone.setDefault(TimeZone.getTimeZone("UTC")) + DateTimeZone.setDefault(DateTimeZone.forID("UTC")) + TestHive.setConf(DruidPlanner.TZ_ID.key, "UTC") + + TestHive.sparkContext.setLogLevel("INFO") + + register(TestHive) + // DruidPlanner(TestHive) + + val cT = olFlatCreateTable + + println(cT) + sql(cT) + + TestHive.table("orderLineItemPartSupplierBase").cache() + + TestHive.setConf(DruidPlanner.SPARKLINEDATA_CACHE_TABLES_TOCHECK.key, + "orderLineItemPartSupplierBase") + + // sql("select * from orderLineItemPartSupplierBase limit 10").show(10) + + val cTOlap = olDruidDS() + println(cTOlap) sql(cTOlap) } diff --git a/src/test/scala/org/sparklinedata/druid/client/MultiDBTest.scala b/src/test/scala/org/sparklinedata/druid/client/MultiDBTest.scala new file mode 100644 index 0000000..9b242e3 --- /dev/null +++ b/src/test/scala/org/sparklinedata/druid/client/MultiDBTest.scala @@ -0,0 +1,140 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.sparklinedata.druid.client + +import org.apache.spark.Logging +import org.apache.spark.sql.hive.test.sparklinedata.TestHive +import org.apache.spark.sql.hive.test.sparklinedata.TestHive._ +import org.apache.spark.sql.sources.druid.DruidPlanner +import org.scalatest.BeforeAndAfterAll +import org.sparklinedata.spark.dateTime.dsl.expressions._ + + +object MultiDBStarSchemaTpchQueries { + + import StarSchemaTpchQueries._ + + val q3 = date""" + select + o_orderkey, + sum(l_extendedprice) as price, o_orderdate, + o_shippriority + from default.customer, + |default.orders, + |lineitem + where c_mktsegment = 'BUILDING' and $q3OrderDtPredicate and $q3ShipDtPredicate + |and c_custkey = o_custkey + |and l_orderkey = o_orderkey + group by o_orderkey, + o_orderdate, + o_shippriority + """.stripMargin +} + +class MultiDBTest extends StarSchemaBaseTest with BeforeAndAfterAll with Logging { + + override def beforeAll() = { + super.beforeAll() + + /* + * - orderLineItemPartSupplierBase in DB tpch + * - orderLineItemPartSupplier Druid DS in DB tpch2 + * - lineItem Druid DS in tpch2, all dimension tables in default DB. + */ + + sql("create database tpch") + sql("create database tpch2") + sql("use tpch") + + sql(olFlatCreateTable) + + TestHive.table("orderLineItemPartSupplierBase").cache() + + TestHive.setConf(DruidPlanner.SPARKLINEDATA_CACHE_TABLES_TOCHECK.key, + "tpch.orderLineItemPartSupplierBase,suppregion,suppnation,custregion," + + "custnation,customer,part,supplier,partsupp,orders,lineitembase") + + sql("use tpch2") + val cTOlap = olDruidDS("tpch") + sql(cTOlap) + + sql( + s"""CREATE TABLE if not exists lineitem + USING org.sparklinedata.druid + OPTIONS (sourceDataframe "default.lineItemBase", + timeDimensionColumn "l_shipdate", + druidDatasource "tpch", + druidHost "localhost", + zkQualifyDiscoveryNames "true", + columnMapping '$colMapping', + numProcessingThreadsPerHistorical '1', + functionalDependencies '$functionalDependencies', + starSchema '${starSchema("tpch2", "default")}')""".stripMargin + ) + + /* + * View creation on Spark DataSource fails; Spark defers to Hive to create the view + * Hive thinks the table's schema is (col : String) i.e. 1 String column named 'col' + */ + // sql( + // """ + // |create view orderLineItemPartSupplierBase_view as + // |select o_orderkey, o_custkey + // |from orderLineItemPartSupplierBase + // """.stripMargin) + + // sql(olDruidDS("tpch", + // "orderLineItemPartSupplierBase_view", + // "orderLineItemPartSupplier_view" + // )) + + + } + + override def afterAll(): Unit = { + sql("use default") + super.afterAll() + } + + test("basicAgg", + "select l_returnflag, l_linestatus, " + + "count(*), sum(l_extendedprice) as s, max(ps_supplycost) as m, avg(ps_availqty) as a," + + "count(distinct o_orderkey) " + + "from orderLineItemPartSupplier group by l_returnflag, l_linestatus", + 2, + true + ) + + test("tpchQ3", + MultiDBStarSchemaTpchQueries.q3, + 1, + true, + true, + true + ) + +// test("view", +// """select count(*) +// |from orderLineItemPartSupplier_view +// """.stripMargin, +// 1, +// true, +// true +// ) + +} diff --git a/src/test/scala/org/sparklinedata/druid/client/SelectQueryTest.scala b/src/test/scala/org/sparklinedata/druid/client/SelectQueryTest.scala index 5c552b7..7da85be 100644 --- a/src/test/scala/org/sparklinedata/druid/client/SelectQueryTest.scala +++ b/src/test/scala/org/sparklinedata/druid/client/SelectQueryTest.scala @@ -60,7 +60,7 @@ class SelectQueryTest extends StarSchemaBaseTest with BeforeAndAfterAll with Log numProcessingThreadsPerHistorical '1', nonAggregateQueryHandling "push_project_and_filters", functionalDependencies '$functionalDependencies', - starSchema '${starSchema.replaceAll("lineitem", "lineitem_select")}')""".stripMargin + starSchema '${starSchema().replaceAll("lineitem", "lineitem_select")}')""".stripMargin ) } diff --git a/src/test/scala/org/sparklinedata/druid/client/StarSchemaBaseTest.scala b/src/test/scala/org/sparklinedata/druid/client/StarSchemaBaseTest.scala index 97203ec..0eb812e 100644 --- a/src/test/scala/org/sparklinedata/druid/client/StarSchemaBaseTest.scala +++ b/src/test/scala/org/sparklinedata/druid/client/StarSchemaBaseTest.scala @@ -318,7 +318,7 @@ class StarSchemaBaseTest extends BaseTest with BeforeAndAfterAll with Logging { columnMapping '$colMapping', numProcessingThreadsPerHistorical '1', functionalDependencies '$functionalDependencies', - starSchema '$starSchema')""".stripMargin + starSchema '${starSchema()}')""".stripMargin )