Skip to content

Commit

Permalink
add kudu and mongo
Browse files Browse the repository at this point in the history
  • Loading branch information
kid-xiong committed Nov 4, 2018
1 parent 62b6c08 commit 7849e75
Show file tree
Hide file tree
Showing 7 changed files with 174 additions and 2 deletions.
2 changes: 2 additions & 0 deletions waterdrop-core/build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@ libraryDependencies ++= Seq(
exclude("org.spark-project.spark", "unused"),
"com.typesafe" % "config" % "1.3.1",
"org.apache.spark" %% "spark-hive" %sparkVersion ,
"org.mongodb.spark" %% "mongo-spark-connector" % sparkVersion,
"org.apache.kudu" %% "kudu-spark2" % "1.7.0",
"com.alibaba" % "QLExpress" % "3.2.0",
"com.alibaba" % "fastjson" % "1.2.47",
"commons-lang" % "commons-lang" % "2.6",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,4 +6,5 @@ io.github.interestinglab.waterdrop.output.Jdbc
io.github.interestinglab.waterdrop.output.Kafka
io.github.interestinglab.waterdrop.output.Mysql
io.github.interestinglab.waterdrop.output.S3
io.github.interestinglab.waterdrop.output.Stdout
io.github.interestinglab.waterdrop.output.Stdout
io.github.interestinglab.waterdrop.output.MongoDB
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
io.github.interestinglab.waterdrop.input.Fake2
io.github.interestinglab.waterdrop.input.Hdfs
io.github.interestinglab.waterdrop.input.File
io.github.interestinglab.waterdrop.input.Hive
io.github.interestinglab.waterdrop.input.Hive
io.github.interestinglab.waterdrop.input.MongoDB
io.github.interestinglab.waterdrop.input.Kudu
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
package io.github.interestinglab.waterdrop.input


import com.typesafe.config.{Config, ConfigFactory}
import io.github.interestinglab.waterdrop.apis.BaseStaticInput
import org.apache.spark.sql.{Dataset, Row, SparkSession}
import org.apache.kudu.spark.kudu._


class Kudu extends BaseStaticInput {

var config: Config = ConfigFactory.empty()

override def setConfig(config: Config): Unit = {
this.config = config
}

override def getConfig(): Config = {
this.config
}

override def checkConfig(): (Boolean, String) = {
config.hasPath("kudu_master") && config.hasPath("kudu_table") && config.hasPath("table_name") match {
case true => (true, "")
case false => (false, "please specify [kudu_master] and [kudu_table] and [table_name]")
}
}


override def getDataset(spark: SparkSession): Dataset[Row] = {
val mapConf = Map(
"kudu.master" -> config.getString("kudu_master"),
"kudu.table" -> config.getString("kudu_table"))

val ds = spark.read.format("org.apache.kudu.spark.kudu")
.options(mapConf).kudu
ds.createOrReplaceTempView(config.getString("table_name"))
ds
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
package io.github.interestinglab.waterdrop.input

import com.mongodb.spark.MongoSpark
import com.mongodb.spark.config.ReadConfig
import com.typesafe.config.{Config, ConfigFactory}
import io.github.interestinglab.waterdrop.apis.BaseStaticInput
import org.apache.spark.sql.{Dataset, Row, SparkSession}

import scala.collection.JavaConversions._

class MongoDB extends BaseStaticInput {

var config: Config = ConfigFactory.empty()

override def setConfig(config: Config): Unit = {
var defaultConfig = ConfigFactory.parseMap(
Map(
"partitioner" -> "MongoShardedPartitioner"
)
)
this.config = config.withFallback(defaultConfig)
}

override def getConfig(): Config = {
this.config
}

override def checkConfig(): (Boolean, String) = {
config.hasPath("mongo_uri") && config.hasPath("database") && config.hasPath("collection") && config.hasPath("table_name") match {
case true => (true, "you can please mongobd input partitioner,default is MongoShardedPartitioner")
case false => (false, "please specify [mongo_uri] and [database] and [collection] and [table_name]")
}
}


override def getDataset(spark: SparkSession): Dataset[Row] = {
val configur = ReadConfig(Map(
"uri" -> config.getString("mongo_uri"),
"spark.mongodb.input.partitioner" -> config.getString("partitioner"),
"database" -> config.getString("database"),
"collection" -> config.getString("collection")))
MongoSpark.load(spark, configur)
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
package io.github.interestinglab.waterdrop.output

import com.typesafe.config.{Config, ConfigFactory}
import io.github.interestinglab.waterdrop.apis.{BaseOutput}
import org.apache.kudu.spark.kudu._
import org.apache.spark.sql.{Dataset, Row}


class Kudu extends BaseOutput {

var config: Config = ConfigFactory.empty()

override def setConfig(config: Config): Unit = {
this.config = config
}

override def getConfig(): Config = {
this.config
}

override def checkConfig(): (Boolean, String) = {
config.hasPath("kudu_master") && config.hasPath("kudu_table") && config.hasPath("table_name") match {
case true => (true, "")
case false => (false, "please specify [kudu_master] and [kudu_table] and [table_name]")
}
}


override def process(df: Dataset[Row]): Unit = {

val kuduContext = new KuduContext(config.getString("kudu_master"),df.sparkSession.sparkContext)

kuduContext.upsertRows(df,config.getString("kudu_table"))

}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
package io.github.interestinglab.waterdrop.output

import com.mongodb.spark.MongoSpark
import com.mongodb.spark.config.WriteConfig
import com.typesafe.config.{Config, ConfigFactory}
import io.github.interestinglab.waterdrop.apis.BaseOutput
import org.apache.spark.sql.{Dataset, Row}

import scala.collection.JavaConversions._

class MongoDB extends BaseOutput {

var config: Config = ConfigFactory.empty()

override def setConfig(config: Config): Unit = {
var defaultConfig = ConfigFactory.parseMap(
Map(
"isReplace" -> "false"
)
)
this.config = config.withFallback(defaultConfig)
}

override def getConfig(): Config = {
this.config
}

override def checkConfig(): (Boolean, String) = {
config.hasPath("mongo_uri") && config.hasPath("database") && config.hasPath("collection") match {
case true => (true, "")
case false => (false, "please specify [mongo_uri] and [database] and [collection] ")
}
}

override def process(df: Dataset[Row]): Unit = {

val writeConf = WriteConfig(Map(
"uri" -> config.getString("mongo_uri"),
"database" -> config.getString("database"),
"collection" -> config.getString("collection"),
"replaceDocument" -> config.getString("isReplace")
))
MongoSpark.save(df,writeConf)
}
}

0 comments on commit 7849e75

Please sign in to comment.