Skip to content

Commit

Permalink
support oracle datasource
Browse files Browse the repository at this point in the history
  • Loading branch information
Nicole00 committed Aug 23, 2022
1 parent 543659e commit 1e8ad19
Show file tree
Hide file tree
Showing 10 changed files with 219 additions and 24 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,8 @@ object SourceCategory extends Enumeration {
val HBASE = Value("HBASE")
val MAXCOMPUTE = Value("MAXCOMPUTE")
val CLICKHOUSE = Value("CLICKHOUSE")
val POSTGRESQL = Value("POSTGRESQL")
val POSTGRESQL = Value("POSTGRESQL")
val ORACLE = Value("ORACLE")

val SOCKET = Value("SOCKET")
val KAFKA = Value("KAFKA")
Expand Down Expand Up @@ -159,27 +160,26 @@ case class MySQLSourceConfigEntry(override val category: SourceCategory.Value,
}

/**
* PostgreSQLSourceConfigEntry
*
* @param category
* @param host
* @param port
* @param database
* @param table
* @param user
* @param password
* @param sentence
*/
* PostgreSQLSourceConfigEntry
*
* @param category
* @param host
* @param port
* @param database
* @param table
* @param user
* @param password
* @param sentence
*/
case class PostgreSQLSourceConfigEntry(override val category: SourceCategory.Value,
host: String,
port: Int,
database: String,
table: String,
user: String,
password: String,
override val sentence: String
)
extends ServerDataSourceConfigEntry {
override val sentence: String)
extends ServerDataSourceConfigEntry {
require(
host.trim.length != 0 && port > 0 && database.trim.length > 0 && table.trim.length > 0 && user.trim.length > 0)

Expand Down Expand Up @@ -296,3 +296,19 @@ case class ClickHouseConfigEntry(override val category: SourceCategory.Value,
s"ClickHouse source {url:$url, user:$user, passwd:$passwd, numPartition:$numPartition, table:$table, sentence:$sentence}"
}
}

/**
* OracleConfigEntry
*/
case class OracleConfigEntry(override val category: SourceCategory.Value,
url: String,
driver: String,
user: String,
passwd: String,
table: String,
override val sentence: String)
extends ServerDataSourceConfigEntry {
override def toString: String = {
s"Oracle source {url:$url, driver:$driver, user:$user, passwd:$passwd, table:$table, sentence:$sentence}"
}
}
5 changes: 5 additions & 0 deletions nebula-exchange_spark_2.2/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -446,6 +446,11 @@
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>com.oracle.database.jdbc</groupId>
<artifactId>ojdbc11</artifactId>
<version>21.4.0.0</version>
</dependency>
<dependency>
<groupId>com.vesoft</groupId>
<artifactId>exchange-common</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,41 @@ import org.apache.spark.sql.{DataFrame, SparkSession}
import java.io.File
import com.vesoft.exchange.Argument
import com.vesoft.exchange.common.{CheckPointHandler, ErrorHandler}
import com.vesoft.exchange.common.config.{ClickHouseConfigEntry, Configs, DataSourceConfigEntry, FileBaseSourceConfigEntry, HBaseSourceConfigEntry, HiveSourceConfigEntry, JanusGraphSourceConfigEntry, KafkaSourceConfigEntry, MaxComputeConfigEntry, MySQLSourceConfigEntry, Neo4JSourceConfigEntry, PostgreSQLSourceConfigEntry, PulsarSourceConfigEntry, SinkCategory, SourceCategory}
import com.vesoft.nebula.exchange.reader.{CSVReader, ClickhouseReader, HBaseReader, HiveReader, JSONReader, JanusGraphReader, KafkaReader, MaxcomputeReader, MySQLReader, Neo4JReader, ORCReader, ParquetReader, PostgreSQLReader, PulsarReader}
import com.vesoft.exchange.common.config.{
ClickHouseConfigEntry,
Configs,
DataSourceConfigEntry,
FileBaseSourceConfigEntry,
HBaseSourceConfigEntry,
HiveSourceConfigEntry,
JanusGraphSourceConfigEntry,
KafkaSourceConfigEntry,
MaxComputeConfigEntry,
MySQLSourceConfigEntry,
Neo4JSourceConfigEntry,
OracleConfigEntry,
PostgreSQLSourceConfigEntry,
PulsarSourceConfigEntry,
SinkCategory,
SourceCategory
}
import com.vesoft.nebula.exchange.reader.{
CSVReader,
ClickhouseReader,
HBaseReader,
HiveReader,
JSONReader,
JanusGraphReader,
KafkaReader,
MaxcomputeReader,
MySQLReader,
Neo4JReader,
OracleReader,
ORCReader,
PostgreSQLReader,
ParquetReader,
PulsarReader
}
import com.vesoft.exchange.common.processor.ReloadProcessor
import com.vesoft.nebula.exchange.processor.{EdgeProcessor, VerticesProcessor}
import org.apache.log4j.Logger
Expand Down Expand Up @@ -283,6 +316,11 @@ object Exchange {
val reader = new ClickhouseReader(session, clickhouseConfigEntry)
Some(reader.read())
}
case SourceCategory.ORACLE => {
val oracleConfig = config.asInstanceOf[OracleConfigEntry]
val reader = new OracleReader(session, oracleConfig)
Some(reader.read())
}
case _ => {
LOG.error(s"Data source ${config.category} not supported")
None
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import com.vesoft.exchange.common.config.{
MaxComputeConfigEntry,
MySQLSourceConfigEntry,
Neo4JSourceConfigEntry,
OracleConfigEntry,
PostgreSQLSourceConfigEntry,
ServerDataSourceConfigEntry
}
Expand Down Expand Up @@ -289,3 +290,30 @@ class ClickhouseReader(override val session: SparkSession,
df
}
}

/**
* Oracle reader
*/
class OracleReader(override val session: SparkSession, oracleConfig: OracleConfigEntry)
extends ServerBaseReader(session, oracleConfig.sentence) {
Class.forName(oracleConfig.driver)
override def read(): DataFrame = {
var df = session.read
.format("jdbc")
.option("url", oracleConfig.url)
.option("dbtable", oracleConfig.table)
.option("user", oracleConfig.user)
.option("password", oracleConfig.passwd)
.option("driver", oracleConfig.driver)
.load()

if (oracleConfig.sentence != null) {
val tableName = if (oracleConfig.table.contains(".")) {
oracleConfig.table.split("\\.")(1)
} else oracleConfig.table
df.createOrReplaceTempView(tableName)
df = session.sql(sentence)
}
df
}
}
5 changes: 5 additions & 0 deletions nebula-exchange_spark_2.4/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -533,6 +533,11 @@
<artifactId>emr-maxcompute_2.11</artifactId>
<version>2.2.0</version>
</dependency>
<dependency>
<groupId>com.oracle.database.jdbc</groupId>
<artifactId>ojdbc11</artifactId>
<version>21.4.0.0</version>
</dependency>
<dependency>
<groupId>com.vesoft</groupId>
<artifactId>exchange-common</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,41 @@ import org.apache.spark.sql.{DataFrame, SparkSession}
import java.io.File
import com.vesoft.exchange.Argument
import com.vesoft.exchange.common.{CheckPointHandler, ErrorHandler}
import com.vesoft.exchange.common.config.{ClickHouseConfigEntry, Configs, DataSourceConfigEntry, FileBaseSourceConfigEntry, HBaseSourceConfigEntry, HiveSourceConfigEntry, JanusGraphSourceConfigEntry, KafkaSourceConfigEntry, MaxComputeConfigEntry, MySQLSourceConfigEntry, Neo4JSourceConfigEntry, PostgreSQLSourceConfigEntry, PulsarSourceConfigEntry, SinkCategory, SourceCategory}
import com.vesoft.nebula.exchange.reader.{CSVReader, ClickhouseReader, HBaseReader, HiveReader, JSONReader, JanusGraphReader, KafkaReader, MaxcomputeReader, MySQLReader, Neo4JReader, ORCReader, ParquetReader, PostgreSQLReader, PulsarReader}
import com.vesoft.exchange.common.config.{
ClickHouseConfigEntry,
Configs,
DataSourceConfigEntry,
FileBaseSourceConfigEntry,
HBaseSourceConfigEntry,
HiveSourceConfigEntry,
JanusGraphSourceConfigEntry,
KafkaSourceConfigEntry,
MaxComputeConfigEntry,
MySQLSourceConfigEntry,
Neo4JSourceConfigEntry,
OracleConfigEntry,
PostgreSQLSourceConfigEntry,
PulsarSourceConfigEntry,
SinkCategory,
SourceCategory
}
import com.vesoft.nebula.exchange.reader.{
CSVReader,
ClickhouseReader,
HBaseReader,
HiveReader,
JSONReader,
JanusGraphReader,
KafkaReader,
MaxcomputeReader,
MySQLReader,
Neo4JReader,
OracleReader,
ORCReader,
PostgreSQLReader,
ParquetReader,
PulsarReader
}
import com.vesoft.exchange.common.processor.ReloadProcessor
import com.vesoft.nebula.exchange.processor.{EdgeProcessor, VerticesProcessor}
import org.apache.log4j.Logger
Expand Down Expand Up @@ -283,6 +316,11 @@ object Exchange {
val reader = new ClickhouseReader(session, clickhouseConfigEntry)
Some(reader.read())
}
case SourceCategory.ORACLE => {
val oracleConfig = config.asInstanceOf[OracleConfigEntry]
val reader = new OracleReader(session, oracleConfig)
Some(reader.read())
}
case _ => {
LOG.error(s"Data source ${config.category} not supported")
None
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import com.vesoft.exchange.common.config.{
MaxComputeConfigEntry,
MySQLSourceConfigEntry,
Neo4JSourceConfigEntry,
OracleConfigEntry,
PostgreSQLSourceConfigEntry,
ServerDataSourceConfigEntry
}
Expand Down Expand Up @@ -103,7 +104,6 @@ class MySQLReader(override val session: SparkSession, mysqlConfig: MySQLSourceCo

/**
* The PostgreSQLReader extends the ServerBaseReader
*
*/
class PostgreSQLReader(override val session: SparkSession,
postgreConfig: PostgreSQLSourceConfigEntry)
Expand Down Expand Up @@ -334,6 +334,7 @@ class ClickhouseReader(override val session: SparkSession,
clickHouseConfigEntry: ClickHouseConfigEntry)
extends ServerBaseReader(session, clickHouseConfigEntry.sentence) {
Class.forName("ru.yandex.clickhouse.ClickHouseDriver")

override def read(): DataFrame = {
val df = session.read
.format("jdbc")
Expand All @@ -347,3 +348,30 @@ class ClickhouseReader(override val session: SparkSession,
df
}
}

/**
* Oracle reader
*/
class OracleReader(override val session: SparkSession, oracleConfig: OracleConfigEntry)
extends ServerBaseReader(session, oracleConfig.sentence) {
Class.forName(oracleConfig.driver)
override def read(): DataFrame = {
var df = session.read
.format("jdbc")
.option("url", oracleConfig.url)
.option("dbtable", oracleConfig.table)
.option("user", oracleConfig.user)
.option("password", oracleConfig.passwd)
.option("driver", oracleConfig.driver)
.load()

if (oracleConfig.sentence != null) {
val tableName = if (oracleConfig.table.contains(".")) {
oracleConfig.table.split("\\.")(1)
} else oracleConfig.table
df.createOrReplaceTempView(tableName)
df = session.sql(sentence)
}
df
}
}
6 changes: 5 additions & 1 deletion nebula-exchange_spark_3.0/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -380,7 +380,11 @@
<artifactId>pulsar-spark-connector_2.12</artifactId>
<version>${pulsar.version}</version>
</dependency>

<dependency>
<groupId>com.oracle.database.jdbc</groupId>
<artifactId>ojdbc11</artifactId>
<version>21.4.0.0</version>
</dependency>
<dependency>
<groupId>com.vesoft</groupId>
<artifactId>exchange-common</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@
package com.vesoft.nebula.exchange

import org.apache.spark.sql.{DataFrame, SparkSession}

import java.io.File
import com.vesoft.exchange.Argument
import com.vesoft.exchange.common.{CheckPointHandler, ErrorHandler}
Expand All @@ -22,6 +21,7 @@ import com.vesoft.exchange.common.config.{
MaxComputeConfigEntry,
MySQLSourceConfigEntry,
Neo4JSourceConfigEntry,
OracleConfigEntry,
PostgreSQLSourceConfigEntry,
PulsarSourceConfigEntry,
SinkCategory,
Expand All @@ -38,6 +38,7 @@ import com.vesoft.nebula.exchange.reader.{
MaxcomputeReader,
MySQLReader,
Neo4JReader,
OracleReader,
ORCReader,
ParquetReader,
PostgreSQLReader,
Expand Down Expand Up @@ -314,6 +315,11 @@ object Exchange {
val reader = new ClickhouseReader(session, clickhouseConfigEntry)
Some(reader.read())
}
case SourceCategory.ORACLE => {
val oracleConfig = config.asInstanceOf[OracleConfigEntry]
val reader = new OracleReader(session, oracleConfig)
Some(reader.read())
}
case _ => {
LOG.error(s"Data source ${config.category} not supported")
None
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import com.vesoft.exchange.common.config.{
MaxComputeConfigEntry,
MySQLSourceConfigEntry,
Neo4JSourceConfigEntry,
OracleConfigEntry,
PostgreSQLSourceConfigEntry,
ServerDataSourceConfigEntry
}
Expand Down Expand Up @@ -84,8 +85,7 @@ class MySQLReader(override val session: SparkSession, mysqlConfig: MySQLSourceCo
}

/**
* The PosrgreReader
* TODO
* The PostgreSQLReader
*
* @param session
* @param postgreConfig
Expand Down Expand Up @@ -237,3 +237,30 @@ class ClickhouseReader(override val session: SparkSession,
df
}
}

/**
* Oracle reader
*/
class OracleReader(override val session: SparkSession, oracleConfig: OracleConfigEntry)
extends ServerBaseReader(session, oracleConfig.sentence) {
Class.forName(oracleConfig.driver)
override def read(): DataFrame = {
var df = session.read
.format("jdbc")
.option("url", oracleConfig.url)
.option("dbtable", oracleConfig.table)
.option("user", oracleConfig.user)
.option("password", oracleConfig.passwd)
.option("driver", oracleConfig.driver)
.load()

if (oracleConfig.sentence != null) {
val tableName = if (oracleConfig.table.contains(".")) {
oracleConfig.table.split("\\.")(1)
} else oracleConfig.table
df.createOrReplaceTempView(tableName)
df = session.sql(sentence)
}
df
}
}

0 comments on commit 1e8ad19

Please sign in to comment.