Skip to content

Commit

Permalink
support oracle datasource (#59)
Browse files Browse the repository at this point in the history
* support oracle datasource

* add configs for oracle

* add example config for oracle
  • Loading branch information
Nicole00 authored Aug 23, 2022
1 parent 3698e58 commit 1ec0788
Show file tree
Hide file tree
Showing 12 changed files with 331 additions and 75 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -572,6 +572,7 @@ object Configs {
case "MAXCOMPUTE" => SourceCategory.MAXCOMPUTE
case "CLICKHOUSE" => SourceCategory.CLICKHOUSE
case "POSTGRESQL" => SourceCategory.POSTGRESQL
case "ORACLE" => SourceCategory.ORACLE
case _ => throw new IllegalArgumentException(s"${category} not support")
}
}
Expand Down Expand Up @@ -671,6 +672,16 @@ object Configs {
config.getString("password"),
getOrElse(config, "sentence", null)
)
case SourceCategory.ORACLE =>
OracleConfigEntry(
SourceCategory.ORACLE,
config.getString("url"),
config.getString("driver"),
config.getString("user"),
config.getString("passwd"),
config.getString("table"),
getOrElse(config, "sentence", null)
)
case SourceCategory.KAFKA =>
val intervalSeconds =
if (config.hasPath("interval.seconds")) config.getInt("interval.seconds")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,8 @@ object SourceCategory extends Enumeration {
val HBASE = Value("HBASE")
val MAXCOMPUTE = Value("MAXCOMPUTE")
val CLICKHOUSE = Value("CLICKHOUSE")
val POSTGRESQL = Value("POSTGRESQL")
val POSTGRESQL = Value("POSTGRESQL")
val ORACLE = Value("ORACLE")

val SOCKET = Value("SOCKET")
val KAFKA = Value("KAFKA")
Expand Down Expand Up @@ -159,27 +160,26 @@ case class MySQLSourceConfigEntry(override val category: SourceCategory.Value,
}

/**
* PostgreSQLSourceConfigEntry
*
* @param category
* @param host
* @param port
* @param database
* @param table
* @param user
* @param password
* @param sentence
*/
* PostgreSQLSourceConfigEntry
*
* @param category
* @param host
* @param port
* @param database
* @param table
* @param user
* @param password
* @param sentence
*/
case class PostgreSQLSourceConfigEntry(override val category: SourceCategory.Value,
host: String,
port: Int,
database: String,
table: String,
user: String,
password: String,
override val sentence: String
)
extends ServerDataSourceConfigEntry {
override val sentence: String)
extends ServerDataSourceConfigEntry {
require(
host.trim.length != 0 && port > 0 && database.trim.length > 0 && table.trim.length > 0 && user.trim.length > 0)

Expand Down Expand Up @@ -296,3 +296,19 @@ case class ClickHouseConfigEntry(override val category: SourceCategory.Value,
s"ClickHouse source {url:$url, user:$user, passwd:$passwd, numPartition:$numPartition, table:$table, sentence:$sentence}"
}
}

/**
* OracleConfigEntry
*/
case class OracleConfigEntry(override val category: SourceCategory.Value,
url: String,
driver: String,
user: String,
passwd: String,
table: String,
override val sentence: String)
extends ServerDataSourceConfigEntry {
override def toString: String = {
s"Oracle source {url:$url, driver:$driver, user:$user, passwd:$passwd, table:$table, sentence:$sentence}"
}
}
5 changes: 5 additions & 0 deletions nebula-exchange_spark_2.2/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -446,6 +446,11 @@
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>com.oracle.database.jdbc</groupId>
<artifactId>ojdbc11</artifactId>
<version>21.4.0.0</version>
</dependency>
<dependency>
<groupId>com.vesoft</groupId>
<artifactId>exchange-common</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,41 @@ import org.apache.spark.sql.{DataFrame, SparkSession}
import java.io.File
import com.vesoft.exchange.Argument
import com.vesoft.exchange.common.{CheckPointHandler, ErrorHandler}
import com.vesoft.exchange.common.config.{ClickHouseConfigEntry, Configs, DataSourceConfigEntry, FileBaseSourceConfigEntry, HBaseSourceConfigEntry, HiveSourceConfigEntry, JanusGraphSourceConfigEntry, KafkaSourceConfigEntry, MaxComputeConfigEntry, MySQLSourceConfigEntry, Neo4JSourceConfigEntry, PostgreSQLSourceConfigEntry, PulsarSourceConfigEntry, SinkCategory, SourceCategory}
import com.vesoft.nebula.exchange.reader.{CSVReader, ClickhouseReader, HBaseReader, HiveReader, JSONReader, JanusGraphReader, KafkaReader, MaxcomputeReader, MySQLReader, Neo4JReader, ORCReader, ParquetReader, PostgreSQLReader, PulsarReader}
import com.vesoft.exchange.common.config.{
ClickHouseConfigEntry,
Configs,
DataSourceConfigEntry,
FileBaseSourceConfigEntry,
HBaseSourceConfigEntry,
HiveSourceConfigEntry,
JanusGraphSourceConfigEntry,
KafkaSourceConfigEntry,
MaxComputeConfigEntry,
MySQLSourceConfigEntry,
Neo4JSourceConfigEntry,
OracleConfigEntry,
PostgreSQLSourceConfigEntry,
PulsarSourceConfigEntry,
SinkCategory,
SourceCategory
}
import com.vesoft.nebula.exchange.reader.{
CSVReader,
ClickhouseReader,
HBaseReader,
HiveReader,
JSONReader,
JanusGraphReader,
KafkaReader,
MaxcomputeReader,
MySQLReader,
Neo4JReader,
OracleReader,
ORCReader,
PostgreSQLReader,
ParquetReader,
PulsarReader
}
import com.vesoft.exchange.common.processor.ReloadProcessor
import com.vesoft.nebula.exchange.processor.{EdgeProcessor, VerticesProcessor}
import org.apache.log4j.Logger
Expand Down Expand Up @@ -283,6 +316,11 @@ object Exchange {
val reader = new ClickhouseReader(session, clickhouseConfigEntry)
Some(reader.read())
}
case SourceCategory.ORACLE => {
val oracleConfig = config.asInstanceOf[OracleConfigEntry]
val reader = new OracleReader(session, oracleConfig)
Some(reader.read())
}
case _ => {
LOG.error(s"Data source ${config.category} not supported")
None
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import com.vesoft.exchange.common.config.{
MaxComputeConfigEntry,
MySQLSourceConfigEntry,
Neo4JSourceConfigEntry,
OracleConfigEntry,
PostgreSQLSourceConfigEntry,
ServerDataSourceConfigEntry
}
Expand Down Expand Up @@ -289,3 +290,30 @@ class ClickhouseReader(override val session: SparkSession,
df
}
}

/**
* Oracle reader
*/
class OracleReader(override val session: SparkSession, oracleConfig: OracleConfigEntry)
extends ServerBaseReader(session, oracleConfig.sentence) {
Class.forName(oracleConfig.driver)
override def read(): DataFrame = {
var df = session.read
.format("jdbc")
.option("url", oracleConfig.url)
.option("dbtable", oracleConfig.table)
.option("user", oracleConfig.user)
.option("password", oracleConfig.passwd)
.option("driver", oracleConfig.driver)
.load()

if (oracleConfig.sentence != null) {
val tableName = if (oracleConfig.table.contains(".")) {
oracleConfig.table.split("\\.")(1)
} else oracleConfig.table
df.createOrReplaceTempView(tableName)
df = session.sql(sentence)
}
df
}
}
5 changes: 5 additions & 0 deletions nebula-exchange_spark_2.4/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -533,6 +533,11 @@
<artifactId>emr-maxcompute_2.11</artifactId>
<version>2.2.0</version>
</dependency>
<dependency>
<groupId>com.oracle.database.jdbc</groupId>
<artifactId>ojdbc11</artifactId>
<version>21.4.0.0</version>
</dependency>
<dependency>
<groupId>com.vesoft</groupId>
<artifactId>exchange-common</artifactId>
Expand Down
152 changes: 101 additions & 51 deletions nebula-exchange_spark_2.4/src/main/resources/application.conf
Original file line number Diff line number Diff line change
Expand Up @@ -318,29 +318,52 @@
}

# PostgreSQL
{
name: tag9
type: {
source: postgresql
sink: client
}
user:root
host: "127.0.0.1"
port: "5432"
database: "database"
table: "table"
user: "root"
password: "nebula"
sentence: "select postgre-field0, postgre-field1, postgre-field2 from table"
fields: [postgre-field-0, postgre-field-1, postgre-field-2]
nebula.fields: [nebula-field-0, nebula-field-1, nebula-field-2]
vertex: {
field: postgre-field-0
# policy: "hash"
}
batch: 256
partition: 32
}
{
name: tag9
type: {
source: postgresql
sink: client
}
user:root
host: "127.0.0.1"
port: "5432"
database: "database"
table: "table"
user: "root"
password: "nebula"
sentence: "select postgre-field0, postgre-field1, postgre-field2 from table"
fields: [postgre-field-0, postgre-field-1, postgre-field-2]
nebula.fields: [nebula-field-0, nebula-field-1, nebula-field-2]
vertex: {
field: postgre-field-0
# policy: "hash"
}
batch: 256
partition: 32
}

# Oracle
{
name: tag10
type: {
source: oracle
sink: client
}
url:"jdbc:oracle:thin:@host:1521:db"
driver: "oracle.jdbc.driver.OracleDriver"
user: "root"
password: "nebula"
table: "db.table"
sentence: "select oracle-field0, oracle-field1, oracle-field2 from table"
fields: [oracle-field-0, oracle-field-1, oracle-field-2]
nebula.fields: [nebula-field-0, nebula-field-1, nebula-field-2]
vertex: {
field: oracle-field-0
# policy: "hash"
}
batch: 256
partition: 32
}
]

# Processing edges
Expand Down Expand Up @@ -591,33 +614,60 @@
}

# PostgreSQL
{
name: edge-name-8
type: {
source: postgresql
sink: client
}
user:root
host: "127.0.0.1"
port: "5432"
database: "database"
table: "table"
user: "root"
password: "nebula"
sentence: "select postgre-field0, postgre-field1, postgre-field2 from table"
fields: [postgre-field-0, postgre-field-1, postgre-field-2]
nebula.fields: [nebula-field-0, nebula-field-1, nebula-field-2]
source: {
field: mysql-field-0
# policy: "hash"
}
source: {
field: mysql-field-0
# policy: "hash"
}
ranking: postgre-field-1
batch: 256
partition: 32
}
{
name: edge-name-8
type: {
source: postgresql
sink: client
}
user:root
host: "127.0.0.1"
port: "5432"
database: "database"
table: "table"
user: "root"
password: "nebula"
sentence: "select postgre-field0, postgre-field1, postgre-field2 from table"
fields: [postgre-field-0, postgre-field-1, postgre-field-2]
nebula.fields: [nebula-field-0, nebula-field-1, nebula-field-2]
source: {
field: mysql-field-0
# policy: "hash"
}
target: {
field: mysql-field-0
# policy: "hash"
}
ranking: postgre-field-1
batch: 256
partition: 32
}

# Oracle
{
name: edge-name-9
type: {
source: oracle
sink: client
}
url:"jdbc:oracle:thin:@host:1521:db"
driver: "oracle.jdbc.driver.OracleDriver"
user: "root"
password: "nebula"
table: "db.table"
sentence: "select oracle-field0, oracle-field1, oracle-field2 from table"
fields: [oracle-field-0, oracle-field-1, oracle-field-2]
nebula.fields: [nebula-field-0, nebula-field-1, nebula-field-2]
source: {
field: oracle-field-0
# policy: "hash"
}
target: {
field: oracle-field-1
}
ranking: oracle-field-2
batch: 256
partition: 32
}
]
}
Loading

0 comments on commit 1ec0788

Please sign in to comment.