diff --git a/nebula-exchange/pom.xml b/nebula-exchange/pom.xml
index 8f1b6cb7..ae432333 100644
--- a/nebula-exchange/pom.xml
+++ b/nebula-exchange/pom.xml
@@ -37,6 +37,7 @@
1.14
2.6.1
1.2.0
+ 2.0.0
@@ -132,7 +133,7 @@
false
- org.apache.spark:*
+
org.apache.hadoop:*
org.apache.hive:*
log4j:log4j
@@ -254,6 +255,17 @@
+
+ org.apache.spark
+ spark-sql-kafka-0-10_2.11
+ ${spark.version}
+
+
+ org.apache.spark
+ *
+
+
+
io.streamnative.connectors
pulsar-spark-connector_2.11
@@ -263,6 +275,7 @@
org.apache.spark
spark-core_2.11
${spark.version}
+ provided
snappy-java
@@ -362,6 +375,7 @@
org.apache.spark
spark-sql_2.11
${spark.version}
+ provided
snappy-java
@@ -401,11 +415,13 @@
org.apache.spark
spark-catalyst_2.11
${spark.version}
+ provided
org.apache.spark
spark-hive_2.11
${spark.version}
+ provided
commons-codec
@@ -455,12 +471,17 @@
commons-io
commons-io
+
+ hive-metastore
+ org.spark-project.hive
+
org.apache.spark
spark-yarn_2.11
${spark.version}
+ provided
guava
diff --git a/nebula-exchange/src/main/scala/com/vesoft/nebula/exchange/config/Configs.scala b/nebula-exchange/src/main/scala/com/vesoft/nebula/exchange/config/Configs.scala
index 819ee782..41a041d6 100644
--- a/nebula-exchange/src/main/scala/com/vesoft/nebula/exchange/config/Configs.scala
+++ b/nebula-exchange/src/main/scala/com/vesoft/nebula/exchange/config/Configs.scala
@@ -231,25 +231,26 @@ case class Configs(databaseConfig: DataBaseConfigEntry,
object Configs {
private[this] val LOG = Logger.getLogger(this.getClass)
- private[this] val DEFAULT_CONNECTION_TIMEOUT = Integer.MAX_VALUE
- private[this] val DEFAULT_CONNECTION_RETRY = 3
- private[this] val DEFAULT_EXECUTION_RETRY = 3
- private[this] val DEFAULT_EXECUTION_TIMEOUT = Integer.MAX_VALUE
- private[this] val DEFAULT_EXECUTION_INTERVAL = 3000
- private[this] val DEFAULT_ERROR_OUTPUT_PATH = "/tmp/nebula.writer.errors/"
- private[this] val DEFAULT_ERROR_MAX_BATCH_SIZE = Int.MaxValue
- private[this] val DEFAULT_RATE_LIMIT = 1024
- private[this] val DEFAULT_RATE_TIMEOUT = 100
- private[this] val DEFAULT_ENABLE_SSL = false
- private[this] val DEFAULT_SSL_SIGN_TYPE = "CA"
- private[this] val DEFAULT_EDGE_RANKING = 0L
- private[this] val DEFAULT_BATCH = 2
- private[this] val DEFAULT_PARTITION = -1
- private[this] val DEFAULT_CHECK_POINT_PATH = None
- private[this] val DEFAULT_LOCAL_PATH = None
- private[this] val DEFAULT_REMOTE_PATH = None
- private[this] val DEFAULT_STREAM_INTERVAL = 30
- private[this] val DEFAULT_PARALLEL = 1
+ private[this] val DEFAULT_CONNECTION_TIMEOUT = Integer.MAX_VALUE
+ private[this] val DEFAULT_CONNECTION_RETRY = 3
+ private[this] val DEFAULT_EXECUTION_RETRY = 3
+ private[this] val DEFAULT_EXECUTION_TIMEOUT = Integer.MAX_VALUE
+ private[this] val DEFAULT_EXECUTION_INTERVAL = 3000
+ private[this] val DEFAULT_ERROR_OUTPUT_PATH = "/tmp/nebula.writer.errors/"
+ private[this] val DEFAULT_ERROR_MAX_BATCH_SIZE = Int.MaxValue
+ private[this] val DEFAULT_RATE_LIMIT = 1024
+ private[this] val DEFAULT_RATE_TIMEOUT = 100
+ private[this] val DEFAULT_ENABLE_SSL = false
+ private[this] val DEFAULT_SSL_SIGN_TYPE = "CA"
+ private[this] val DEFAULT_EDGE_RANKING = 0L
+ private[this] val DEFAULT_BATCH = 2
+ private[this] val DEFAULT_PARTITION = -1
+ private[this] val DEFAULT_CHECK_POINT_PATH = None
+ private[this] val DEFAULT_LOCAL_PATH = None
+ private[this] val DEFAULT_REMOTE_PATH = None
+ private[this] val DEFAULT_STREAM_INTERVAL = 30
+ private[this] val DEFAULT_KAFKA_STARTINGOFFSETS = "latest"
+ private[this] val DEFAULT_PARALLEL = 1
/**
*
@@ -659,10 +660,18 @@ object Configs {
val intervalSeconds =
if (config.hasPath("interval.seconds")) config.getInt("interval.seconds")
else DEFAULT_STREAM_INTERVAL
+ val startingOffsets =
+ if (config.hasPath("startingOffsets")) config.getString("startingOffsets")
+ else DEFAULT_KAFKA_STARTINGOFFSETS
+ val maxOffsetsPerTrigger =
+ if (config.hasPath("maxOffsetsPerTrigger")) Some(config.getLong("maxOffsetsPerTrigger"))
+ else None
KafkaSourceConfigEntry(SourceCategory.KAFKA,
intervalSeconds,
config.getString("service"),
- config.getString("topic"))
+ config.getString("topic"),
+ startingOffsets,
+ maxOffsetsPerTrigger)
case SourceCategory.PULSAR =>
val options =
config.getObject("options").unwrapped.asScala.map(x => x._1 -> x._2.toString).toMap
diff --git a/nebula-exchange/src/main/scala/com/vesoft/nebula/exchange/config/SourceConfigs.scala b/nebula-exchange/src/main/scala/com/vesoft/nebula/exchange/config/SourceConfigs.scala
index 44a296cb..2a21c1ed 100644
--- a/nebula-exchange/src/main/scala/com/vesoft/nebula/exchange/config/SourceConfigs.scala
+++ b/nebula-exchange/src/main/scala/com/vesoft/nebula/exchange/config/SourceConfigs.scala
@@ -162,16 +162,20 @@ case class MySQLSourceConfigEntry(override val category: SourceCategory.Value,
*
* @param server
* @param topic
+ * @param startingOffsets
+ * @param maxOffsetsPerTrigger
*/
case class KafkaSourceConfigEntry(override val category: SourceCategory.Value,
override val intervalSeconds: Int,
server: String,
- topic: String)
+ topic: String,
+ startingOffsets: String,
+ maxOffsetsPerTrigger: Option[Long]=None)
extends StreamingDataSourceConfigEntry {
require(server.trim.nonEmpty && topic.trim.nonEmpty)
override def toString: String = {
- s"Kafka source server: ${server} topic:${topic}"
+ s"Kafka source server: ${server} topic:${topic} startingOffsets:${startingOffsets} maxOffsetsPerTrigger:${maxOffsetsPerTrigger}"
}
}
diff --git a/nebula-exchange/src/main/scala/com/vesoft/nebula/exchange/processor/EdgeProcessor.scala b/nebula-exchange/src/main/scala/com/vesoft/nebula/exchange/processor/EdgeProcessor.scala
index 7e7cb900..425ae802 100644
--- a/nebula-exchange/src/main/scala/com/vesoft/nebula/exchange/processor/EdgeProcessor.scala
+++ b/nebula-exchange/src/main/scala/com/vesoft/nebula/exchange/processor/EdgeProcessor.scala
@@ -274,50 +274,27 @@ class EdgeProcessor(data: DataFrame,
}
}
} else {
+ val streamFlag = data.isStreaming
val edgeFrame = data
- .map { row =>
- var sourceField = if (!edgeConfig.isGeo) {
- val sourceIndex = row.schema.fieldIndex(edgeConfig.sourceField)
- assert(sourceIndex >= 0 && !row.isNullAt(sourceIndex),
- s"source vertexId must exist and cannot be null, your row data is $row")
- val value = row.get(sourceIndex).toString
- if (value.equals(DEFAULT_EMPTY_VALUE)) "" else value
- } else {
- val lat = row.getDouble(row.schema.fieldIndex(edgeConfig.latitude.get))
- val lng = row.getDouble(row.schema.fieldIndex(edgeConfig.longitude.get))
- indexCells(lat, lng).mkString(",")
- }
+ .filter { row => //filter and check row data,if streaming only print log
+ val sourceFlag = checkField(edgeConfig.sourceField, "source_field", row, edgeConfig.sourcePolicy, streamFlag, isVidStringType)
- if (edgeConfig.sourcePolicy.isEmpty) {
- // process string type vid
- if (isVidStringType) {
- sourceField = NebulaUtils.escapeUtil(sourceField).mkString("\"", "", "\"")
- } else {
- assert(NebulaUtils.isNumic(sourceField),
- s"space vidType is int, but your srcId $sourceField is not numeric.")
- }
- } else {
- assert(!isVidStringType,
- "only int vidType can use policy, but your vidType is FIXED_STRING.")
- }
+ val targetFlag = checkField(edgeConfig.targetField, "target_field", row, edgeConfig.targetPolicy, streamFlag, isVidStringType)
- val targetIndex = row.schema.fieldIndex(edgeConfig.targetField)
- assert(targetIndex >= 0 && !row.isNullAt(targetIndex),
- s"target vertexId must exist and cannot be null, your row data is $row")
- var targetField = row.get(targetIndex).toString
- if (targetField.equals(DEFAULT_EMPTY_VALUE)) targetField = ""
- if (edgeConfig.targetPolicy.isEmpty) {
- // process string type vid
- if (isVidStringType) {
- targetField = NebulaUtils.escapeUtil(targetField).mkString("\"", "", "\"")
- } else {
- assert(NebulaUtils.isNumic(targetField),
- s"space vidType is int, but your dstId $targetField is not numeric.")
- }
- } else {
- assert(!isVidStringType,
- "only int vidType can use policy, but your vidType is FIXED_STRING.")
- }
+ val edgeRankFlag = if (edgeConfig.rankingField.isDefined) {
+ val index = row.schema.fieldIndex(edgeConfig.rankingField.get)
+ val ranking = row.get(index).toString
+ if (!NebulaUtils.isNumic(ranking)) {
+ printChoice(streamFlag, s"Not support non-Numeric type for ranking field.your row data is $row")
+ false
+ } else true
+ } else true
+ sourceFlag && targetFlag && edgeRankFlag
+ }
+ .map { row =>
+ val sourceField = processField(edgeConfig.sourceField, "source_field", row, edgeConfig.sourcePolicy, isVidStringType)
+
+ val targetField = processField(edgeConfig.targetField, "target_field", row, edgeConfig.targetPolicy, isVidStringType)
val values = for {
property <- fieldKeys if property.trim.length != 0
@@ -326,8 +303,6 @@ class EdgeProcessor(data: DataFrame,
if (edgeConfig.rankingField.isDefined) {
val index = row.schema.fieldIndex(edgeConfig.rankingField.get)
val ranking = row.get(index).toString
- assert(NebulaUtils.isNumic(ranking), s"Not support non-Numeric type for ranking field")
-
Edge(sourceField, targetField, Some(ranking.toLong), values)
} else {
Edge(sourceField, targetField, None, values)
@@ -335,10 +310,13 @@ class EdgeProcessor(data: DataFrame,
}(Encoders.kryo[Edge])
// streaming write
- if (data.isStreaming) {
+ if (streamFlag) {
val streamingDataSourceConfig =
edgeConfig.dataSourceConfigEntry.asInstanceOf[StreamingDataSourceConfigEntry]
- edgeFrame.writeStream
+ val wStream = edgeFrame.writeStream
+ if (edgeConfig.checkPointPath.isDefined) wStream.option("checkpointLocation", edgeConfig.checkPointPath.get)
+
+ wStream
.foreachBatch((edges, batchId) => {
LOG.info(s"${edgeConfig.name} edge start batch ${batchId}.")
edges.foreachPartition(processEachPartition _)
@@ -357,4 +335,46 @@ class EdgeProcessor(data: DataFrame,
for (index <- DEFAULT_MIN_CELL_LEVEL to DEFAULT_MAX_CELL_LEVEL)
yield s2CellId.parent(index).id()
}
+
+ private[this] def checkField(field: String, fieldType: String, row: Row, policy: Option[KeyPolicy.Value], streamFlag: Boolean, isVidStringType: Boolean): Boolean = {
+ val fieldValue = if (edgeConfig.isGeo && "source_field".equals(fieldType)) {
+ val lat = row.getDouble(row.schema.fieldIndex(edgeConfig.latitude.get))
+ val lng = row.getDouble(row.schema.fieldIndex(edgeConfig.longitude.get))
+ Some(indexCells(lat, lng).mkString(","))
+ } else {
+ val index = row.schema.fieldIndex(field)
+ if (index < 0 || row.isNullAt(index)) {
+ printChoice(streamFlag, s"$fieldType must exist and cannot be null, your row data is $row")
+ None
+ } else Some(row.get(index).toString)
+ }
+
+ val idFlag = fieldValue.isDefined
+ val policyFlag = if (idFlag && policy.isEmpty && !isVidStringType
+ && !NebulaUtils.isNumic(fieldValue.get)) {
+ printChoice(streamFlag, s"space vidType is int, but your $fieldType $fieldValue is not numeric.your row data is $row")
+ false
+ } else if (idFlag && policy.isDefined && isVidStringType) {
+ printChoice(streamFlag, s"only int vidType can use policy, but your vidType is FIXED_STRING.your row data is $row")
+ false
+ } else true
+ idFlag && policyFlag
+ }
+
+ private[this] def processField(field: String, fieldType: String, row: Row, policy: Option[KeyPolicy.Value], isVidStringType: Boolean): String = {
+ var fieldValue = if (edgeConfig.isGeo && "source_field".equals(fieldType)) {
+ val lat = row.getDouble(row.schema.fieldIndex(edgeConfig.latitude.get))
+ val lng = row.getDouble(row.schema.fieldIndex(edgeConfig.longitude.get))
+ indexCells(lat, lng).mkString(",")
+ } else {
+ val index = row.schema.fieldIndex(field)
+ val value = row.get(index).toString
+ if (value.equals(DEFAULT_EMPTY_VALUE)) "" else value
+ }
+ // process string type vid
+ if (policy.isEmpty && isVidStringType) {
+ fieldValue = NebulaUtils.escapeUtil(fieldValue).mkString("\"", "", "\"")
+ }
+ fieldValue
+ }
}
diff --git a/nebula-exchange/src/main/scala/com/vesoft/nebula/exchange/processor/Processor.scala b/nebula-exchange/src/main/scala/com/vesoft/nebula/exchange/processor/Processor.scala
index 7dd7ede4..65cc1004 100644
--- a/nebula-exchange/src/main/scala/com/vesoft/nebula/exchange/processor/Processor.scala
+++ b/nebula-exchange/src/main/scala/com/vesoft/nebula/exchange/processor/Processor.scala
@@ -5,21 +5,10 @@
package com.vesoft.nebula.exchange.processor
-import com.vesoft.nebula.{
- Coordinate,
- Date,
- DateTime,
- Geography,
- LineString,
- NullType,
- Point,
- Polygon,
- PropertyType,
- Time,
- Value
-}
+import com.vesoft.nebula.{Coordinate, Date, DateTime, Geography, LineString, NullType, Point, Polygon, PropertyType, Time, Value}
import com.vesoft.nebula.exchange.utils.NebulaUtils.DEFAULT_EMPTY_VALUE
import com.vesoft.nebula.exchange.utils.{HDFSUtils, NebulaUtils}
+import org.apache.log4j.Logger
import org.apache.spark.sql.Row
import org.apache.spark.sql.types.{IntegerType, LongType, StringType}
@@ -33,6 +22,9 @@ import scala.collection.mutable.ListBuffer
*/
trait Processor extends Serializable {
+ @transient
+ private[this] lazy val LOG = Logger.getLogger(this.getClass)
+
/**
* process dataframe to vertices or edges
*/
@@ -230,4 +222,9 @@ trait Processor extends Serializable {
}
}
}
+
+ def printChoice(streamFlag: Boolean, context: String): Unit = {
+ if (streamFlag) LOG.info(context)
+ else assert(assertion = false, context)
+ }
}
diff --git a/nebula-exchange/src/main/scala/com/vesoft/nebula/exchange/processor/VerticesProcessor.scala b/nebula-exchange/src/main/scala/com/vesoft/nebula/exchange/processor/VerticesProcessor.scala
index b646b2d7..0507577d 100644
--- a/nebula-exchange/src/main/scala/com/vesoft/nebula/exchange/processor/VerticesProcessor.scala
+++ b/nebula-exchange/src/main/scala/com/vesoft/nebula/exchange/processor/VerticesProcessor.scala
@@ -229,42 +229,50 @@ class VerticesProcessor(data: DataFrame,
}
}
} else {
+ val streamFlag = data.isStreaming
val vertices = data
+ .filter { row => //filter and check row data,if streaming only print log
+ val index = row.schema.fieldIndex(tagConfig.vertexField)
+ if (index < 0 || row.isNullAt(index)) {
+ printChoice(streamFlag, s"vertexId must exist and cannot be null, your row data is $row")
+ false
+ } else {
+ val vertexId = row.get(index).toString
+ // process int type vid
+ if (tagConfig.vertexPolicy.isEmpty && !isVidStringType && !NebulaUtils.isNumic(vertexId)) {
+ printChoice(streamFlag, s"space vidType is int, but your vertex id $vertexId is not numeric.your row data is $row")
+ false
+ } else if (tagConfig.vertexPolicy.isDefined && isVidStringType) {
+ printChoice(streamFlag, s"only int vidType can use policy, but your vidType is FIXED_STRING.your row data is $row")
+ false
+ } else true
+ }
+ }
.map { row =>
- val vertexID = {
- val index = row.schema.fieldIndex(tagConfig.vertexField)
- assert(index >= 0 && !row.isNullAt(index),
- s"vertexId must exist and cannot be null, your row data is $row")
- var value = row.get(index).toString
- if (value.equals(DEFAULT_EMPTY_VALUE)) { value = "" }
- if (tagConfig.vertexPolicy.isEmpty) {
- // process string type vid
- if (isVidStringType) {
- NebulaUtils.escapeUtil(value).mkString("\"", "", "\"")
- } else {
- // process int type vid
- assert(NebulaUtils.isNumic(value),
- s"space vidType is int, but your vertex id $value is not numeric.")
- value
- }
- } else {
- assert(!isVidStringType,
- "only int vidType can use policy, but your vidType is FIXED_STRING.")
- value
- }
+ val index = row.schema.fieldIndex(tagConfig.vertexField)
+ var vertexId = row.get(index).toString
+ if (vertexId.equals(DEFAULT_EMPTY_VALUE)) {
+ vertexId = ""
+ }
+
+ if (tagConfig.vertexPolicy.isEmpty && isVidStringType){
+ vertexId = NebulaUtils.escapeUtil(vertexId).mkString("\"", "", "\"")
}
val values = for {
property <- fieldKeys if property.trim.length != 0
} yield extraValueForClient(row, property, fieldTypeMap)
- Vertex(vertexID, values)
+ Vertex(vertexId, values)
}(Encoders.kryo[Vertex])
// streaming write
- if (data.isStreaming) {
+ if (streamFlag) {
val streamingDataSourceConfig =
tagConfig.dataSourceConfigEntry.asInstanceOf[StreamingDataSourceConfigEntry]
- vertices.writeStream
+ val wStream = vertices.writeStream
+ if (tagConfig.checkPointPath.isDefined) wStream.option("checkpointLocation", tagConfig.checkPointPath.get)
+
+ wStream
.foreachBatch((vertexSet, batchId) => {
LOG.info(s"${tagConfig.name} tag start batch ${batchId}.")
vertexSet.foreachPartition(processEachPartition _)
diff --git a/nebula-exchange/src/main/scala/com/vesoft/nebula/exchange/reader/StreamingBaseReader.scala b/nebula-exchange/src/main/scala/com/vesoft/nebula/exchange/reader/StreamingBaseReader.scala
index 332060ab..25c8fd50 100644
--- a/nebula-exchange/src/main/scala/com/vesoft/nebula/exchange/reader/StreamingBaseReader.scala
+++ b/nebula-exchange/src/main/scala/com/vesoft/nebula/exchange/reader/StreamingBaseReader.scala
@@ -6,10 +6,8 @@
package com.vesoft.nebula.exchange.reader
import com.vesoft.nebula.exchange.config.{KafkaSourceConfigEntry, PulsarSourceConfigEntry}
-import org.apache.spark.sql.{DataFrame, SparkSession, Row, Encoders}
-import com.alibaba.fastjson.{JSON, JSONObject}
-import org.apache.spark.sql.types.{StructField, StructType, StringType}
-import org.apache.spark.sql.functions.{from_json,col}
+import org.apache.spark.sql.types.StringType
+import org.apache.spark.sql.{DataFrame, SparkSession}
/**
* Spark Streaming
@@ -27,6 +25,7 @@ abstract class StreamingBaseReader(override val session: SparkSession) extends R
*
* @param session
* @param kafkaConfig
+ * @param targetFields
*/
class KafkaReader(override val session: SparkSession,
kafkaConfig: KafkaSourceConfigEntry,
@@ -36,18 +35,24 @@ class KafkaReader(override val session: SparkSession,
require(kafkaConfig.server.trim.nonEmpty && kafkaConfig.topic.trim.nonEmpty && targetFields.nonEmpty)
override def read(): DataFrame = {
+ import org.apache.spark.sql.functions._
import session.implicits._
val fields = targetFields.distinct
- val jsonSchema = StructType(fields.map(field => StructField(field, StringType, true)))
- session.readStream
- .format("kafka")
- .option("kafka.bootstrap.servers", kafkaConfig.server)
- .option("subscribe", kafkaConfig.topic)
- .load()
- .selectExpr("CAST(value AS STRING)")
- .as[(String)]
- .withColumn("value", from_json(col("value"), jsonSchema))
- .select("value.*")
+ val reader =
+ session.readStream
+ .format("kafka")
+ .option("kafka.bootstrap.servers", kafkaConfig.server)
+ .option("subscribe", kafkaConfig.topic)
+ .option("startingOffsets", kafkaConfig.startingOffsets)
+
+ val maxOffsetsPerTrigger = kafkaConfig.maxOffsetsPerTrigger
+ if(maxOffsetsPerTrigger.isDefined) reader.option("maxOffsetsPerTrigger", maxOffsetsPerTrigger.get)
+
+ reader.load()
+ .select($"value".cast(StringType))
+ .select(json_tuple($"value", fields: _*))
+ .toDF(fields: _*)
+
}
}