Skip to content

Commit

Permalink
support kafka -> structured streaming data process (#43)
Browse files Browse the repository at this point in the history
* master → origin/master
cb39a3e0(Merge branch 'master' of https://github.com/riverzzz/nebula-exchange)
6587cbd6(Merge branch 'master' of https://github.com/riverzzz/nebula-exchange)
53e37ee7(Merge branch 'master' of https://github.com/riverzzz/nebula-exchange)
261677e2(增加fields,解析kafkajson数据)
47eea8e5(Merge branch 'master' of https://github.com/riverzzz/nebula-spark-utils)
24ae4250(Merge pull request #166 from Nicole00/readme)
a8fd6623(Merge branch 'master' into readme)
6c2700f4(Merge pull request #164 from Thericecookers/master)
d1724aa5(format)
a13c2415(format)
6196e6d8(add repo transfer note)
f41f41dc(Merge branch 'master' into master)
4cd75f07(Merge pull request #165 from Nicole00/louvain)
8a837127(Merge branch 'master' into louvain)
40dbe339(fix louvain's result format)
138a1a11(bugfix: Reverse edge has wrong partitionId)
4d184f1e(删除多余的pom依赖避免报错)
93a8ff74(支持解析kafka数据,支持配置offset、拉取限制 流数据和离线导入解析兼容 pom配置支持流处理)

* master → origin/master
cb39a3e0(Merge branch 'master' of https://github.com/riverzzz/nebula-exchange)
6587cbd6(Merge branch 'master' of https://github.com/riverzzz/nebula-exchange)
53e37ee7(Merge branch 'master' of https://github.com/riverzzz/nebula-exchange)
261677e2(增加fields,解析kafkajson数据)
47eea8e5(Merge branch 'master' of https://github.com/riverzzz/nebula-spark-utils)
24ae4250(Merge pull request #166 from Nicole00/readme)
a8fd6623(Merge branch 'master' into readme)
6c2700f4(Merge pull request #164 from Thericecookers/master)
d1724aa5(format)
a13c2415(format)
6196e6d8(add repo transfer note)
f41f41dc(Merge branch 'master' into master)
4cd75f07(Merge pull request #165 from Nicole00/louvain)
8a837127(Merge branch 'master' into louvain)
40dbe339(fix louvain's result format)
138a1a11(bugfix: Reverse edge has wrong partitionId)
4d184f1e(删除多余的pom依赖避免报错)
93a8ff74(支持解析kafka数据,支持配置offset、拉取限制 流数据和离线导入解析兼容 pom配置支持流处理)

* format the code style
remove value data
extract function
  • Loading branch information
riverzzz authored Dec 23, 2021
1 parent 14f29ef commit d229bee
Show file tree
Hide file tree
Showing 7 changed files with 183 additions and 119 deletions.
23 changes: 22 additions & 1 deletion nebula-exchange/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
<commons-codec.version>1.14</commons-codec.version>
<hadoop.version>2.6.1</hadoop.version>
<hbase.version>1.2.0</hbase.version>
<kafka.version>2.0.0</kafka.version>
</properties>

<build>
Expand Down Expand Up @@ -132,7 +133,7 @@
<createDependencyReducedPom>false</createDependencyReducedPom>
<artifactSet>
<excludes>
<exclude>org.apache.spark:*</exclude>
<!--<exclude>org.apache.spark:*</exclude>-->
<exclude>org.apache.hadoop:*</exclude>
<exclude>org.apache.hive:*</exclude>
<exclude>log4j:log4j</exclude>
Expand Down Expand Up @@ -254,6 +255,17 @@
</build>

<dependencies>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql-kafka-0-10_2.11</artifactId>
<version>${spark.version}</version>
<exclusions>
<exclusion>
<groupId>org.apache.spark</groupId>
<artifactId>*</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>io.streamnative.connectors</groupId>
<artifactId>pulsar-spark-connector_2.11</artifactId>
Expand All @@ -263,6 +275,7 @@
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.11</artifactId>
<version>${spark.version}</version>
<scope>provided</scope>
<exclusions>
<exclusion>
<artifactId>snappy-java</artifactId>
Expand Down Expand Up @@ -362,6 +375,7 @@
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.11</artifactId>
<version>${spark.version}</version>
<scope>provided</scope>
<exclusions>
<exclusion>
<artifactId>snappy-java</artifactId>
Expand Down Expand Up @@ -401,11 +415,13 @@
<groupId>org.apache.spark</groupId>
<artifactId>spark-catalyst_2.11</artifactId>
<version>${spark.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-hive_2.11</artifactId>
<version>${spark.version}</version>
<scope>provided</scope>
<exclusions>
<exclusion>
<artifactId>commons-codec</artifactId>
Expand Down Expand Up @@ -455,12 +471,17 @@
<artifactId>commons-io</artifactId>
<groupId>commons-io</groupId>
</exclusion>
<exclusion>
<artifactId>hive-metastore</artifactId>
<groupId>org.spark-project.hive</groupId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-yarn_2.11</artifactId>
<version>${spark.version}</version>
<scope>provided</scope>
<exclusions>
<exclusion>
<artifactId>guava</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -231,25 +231,26 @@ case class Configs(databaseConfig: DataBaseConfigEntry,
object Configs {
private[this] val LOG = Logger.getLogger(this.getClass)

private[this] val DEFAULT_CONNECTION_TIMEOUT = Integer.MAX_VALUE
private[this] val DEFAULT_CONNECTION_RETRY = 3
private[this] val DEFAULT_EXECUTION_RETRY = 3
private[this] val DEFAULT_EXECUTION_TIMEOUT = Integer.MAX_VALUE
private[this] val DEFAULT_EXECUTION_INTERVAL = 3000
private[this] val DEFAULT_ERROR_OUTPUT_PATH = "/tmp/nebula.writer.errors/"
private[this] val DEFAULT_ERROR_MAX_BATCH_SIZE = Int.MaxValue
private[this] val DEFAULT_RATE_LIMIT = 1024
private[this] val DEFAULT_RATE_TIMEOUT = 100
private[this] val DEFAULT_ENABLE_SSL = false
private[this] val DEFAULT_SSL_SIGN_TYPE = "CA"
private[this] val DEFAULT_EDGE_RANKING = 0L
private[this] val DEFAULT_BATCH = 2
private[this] val DEFAULT_PARTITION = -1
private[this] val DEFAULT_CHECK_POINT_PATH = None
private[this] val DEFAULT_LOCAL_PATH = None
private[this] val DEFAULT_REMOTE_PATH = None
private[this] val DEFAULT_STREAM_INTERVAL = 30
private[this] val DEFAULT_PARALLEL = 1
private[this] val DEFAULT_CONNECTION_TIMEOUT = Integer.MAX_VALUE
private[this] val DEFAULT_CONNECTION_RETRY = 3
private[this] val DEFAULT_EXECUTION_RETRY = 3
private[this] val DEFAULT_EXECUTION_TIMEOUT = Integer.MAX_VALUE
private[this] val DEFAULT_EXECUTION_INTERVAL = 3000
private[this] val DEFAULT_ERROR_OUTPUT_PATH = "/tmp/nebula.writer.errors/"
private[this] val DEFAULT_ERROR_MAX_BATCH_SIZE = Int.MaxValue
private[this] val DEFAULT_RATE_LIMIT = 1024
private[this] val DEFAULT_RATE_TIMEOUT = 100
private[this] val DEFAULT_ENABLE_SSL = false
private[this] val DEFAULT_SSL_SIGN_TYPE = "CA"
private[this] val DEFAULT_EDGE_RANKING = 0L
private[this] val DEFAULT_BATCH = 2
private[this] val DEFAULT_PARTITION = -1
private[this] val DEFAULT_CHECK_POINT_PATH = None
private[this] val DEFAULT_LOCAL_PATH = None
private[this] val DEFAULT_REMOTE_PATH = None
private[this] val DEFAULT_STREAM_INTERVAL = 30
private[this] val DEFAULT_KAFKA_STARTINGOFFSETS = "latest"
private[this] val DEFAULT_PARALLEL = 1

/**
*
Expand Down Expand Up @@ -659,10 +660,18 @@ object Configs {
val intervalSeconds =
if (config.hasPath("interval.seconds")) config.getInt("interval.seconds")
else DEFAULT_STREAM_INTERVAL
val startingOffsets =
if (config.hasPath("startingOffsets")) config.getString("startingOffsets")
else DEFAULT_KAFKA_STARTINGOFFSETS
val maxOffsetsPerTrigger =
if (config.hasPath("maxOffsetsPerTrigger")) Some(config.getLong("maxOffsetsPerTrigger"))
else None
KafkaSourceConfigEntry(SourceCategory.KAFKA,
intervalSeconds,
config.getString("service"),
config.getString("topic"))
config.getString("topic"),
startingOffsets,
maxOffsetsPerTrigger)
case SourceCategory.PULSAR =>
val options =
config.getObject("options").unwrapped.asScala.map(x => x._1 -> x._2.toString).toMap
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -162,16 +162,20 @@ case class MySQLSourceConfigEntry(override val category: SourceCategory.Value,
*
* @param server
* @param topic
* @param startingOffsets
* @param maxOffsetsPerTrigger
*/
case class KafkaSourceConfigEntry(override val category: SourceCategory.Value,
override val intervalSeconds: Int,
server: String,
topic: String)
topic: String,
startingOffsets: String,
maxOffsetsPerTrigger: Option[Long]=None)
extends StreamingDataSourceConfigEntry {
require(server.trim.nonEmpty && topic.trim.nonEmpty)

override def toString: String = {
s"Kafka source server: ${server} topic:${topic}"
s"Kafka source server: ${server} topic:${topic} startingOffsets:${startingOffsets} maxOffsetsPerTrigger:${maxOffsetsPerTrigger}"
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -274,50 +274,27 @@ class EdgeProcessor(data: DataFrame,
}
}
} else {
val streamFlag = data.isStreaming
val edgeFrame = data
.map { row =>
var sourceField = if (!edgeConfig.isGeo) {
val sourceIndex = row.schema.fieldIndex(edgeConfig.sourceField)
assert(sourceIndex >= 0 && !row.isNullAt(sourceIndex),
s"source vertexId must exist and cannot be null, your row data is $row")
val value = row.get(sourceIndex).toString
if (value.equals(DEFAULT_EMPTY_VALUE)) "" else value
} else {
val lat = row.getDouble(row.schema.fieldIndex(edgeConfig.latitude.get))
val lng = row.getDouble(row.schema.fieldIndex(edgeConfig.longitude.get))
indexCells(lat, lng).mkString(",")
}
.filter { row => //filter and check row data,if streaming only print log
val sourceFlag = checkField(edgeConfig.sourceField, "source_field", row, edgeConfig.sourcePolicy, streamFlag, isVidStringType)

if (edgeConfig.sourcePolicy.isEmpty) {
// process string type vid
if (isVidStringType) {
sourceField = NebulaUtils.escapeUtil(sourceField).mkString("\"", "", "\"")
} else {
assert(NebulaUtils.isNumic(sourceField),
s"space vidType is int, but your srcId $sourceField is not numeric.")
}
} else {
assert(!isVidStringType,
"only int vidType can use policy, but your vidType is FIXED_STRING.")
}
val targetFlag = checkField(edgeConfig.targetField, "target_field", row, edgeConfig.targetPolicy, streamFlag, isVidStringType)

val targetIndex = row.schema.fieldIndex(edgeConfig.targetField)
assert(targetIndex >= 0 && !row.isNullAt(targetIndex),
s"target vertexId must exist and cannot be null, your row data is $row")
var targetField = row.get(targetIndex).toString
if (targetField.equals(DEFAULT_EMPTY_VALUE)) targetField = ""
if (edgeConfig.targetPolicy.isEmpty) {
// process string type vid
if (isVidStringType) {
targetField = NebulaUtils.escapeUtil(targetField).mkString("\"", "", "\"")
} else {
assert(NebulaUtils.isNumic(targetField),
s"space vidType is int, but your dstId $targetField is not numeric.")
}
} else {
assert(!isVidStringType,
"only int vidType can use policy, but your vidType is FIXED_STRING.")
}
val edgeRankFlag = if (edgeConfig.rankingField.isDefined) {
val index = row.schema.fieldIndex(edgeConfig.rankingField.get)
val ranking = row.get(index).toString
if (!NebulaUtils.isNumic(ranking)) {
printChoice(streamFlag, s"Not support non-Numeric type for ranking field.your row data is $row")
false
} else true
} else true
sourceFlag && targetFlag && edgeRankFlag
}
.map { row =>
val sourceField = processField(edgeConfig.sourceField, "source_field", row, edgeConfig.sourcePolicy, isVidStringType)

val targetField = processField(edgeConfig.targetField, "target_field", row, edgeConfig.targetPolicy, isVidStringType)

val values = for {
property <- fieldKeys if property.trim.length != 0
Expand All @@ -326,19 +303,20 @@ class EdgeProcessor(data: DataFrame,
if (edgeConfig.rankingField.isDefined) {
val index = row.schema.fieldIndex(edgeConfig.rankingField.get)
val ranking = row.get(index).toString
assert(NebulaUtils.isNumic(ranking), s"Not support non-Numeric type for ranking field")

Edge(sourceField, targetField, Some(ranking.toLong), values)
} else {
Edge(sourceField, targetField, None, values)
}
}(Encoders.kryo[Edge])

// streaming write
if (data.isStreaming) {
if (streamFlag) {
val streamingDataSourceConfig =
edgeConfig.dataSourceConfigEntry.asInstanceOf[StreamingDataSourceConfigEntry]
edgeFrame.writeStream
val wStream = edgeFrame.writeStream
if (edgeConfig.checkPointPath.isDefined) wStream.option("checkpointLocation", edgeConfig.checkPointPath.get)

wStream
.foreachBatch((edges, batchId) => {
LOG.info(s"${edgeConfig.name} edge start batch ${batchId}.")
edges.foreachPartition(processEachPartition _)
Expand All @@ -357,4 +335,46 @@ class EdgeProcessor(data: DataFrame,
for (index <- DEFAULT_MIN_CELL_LEVEL to DEFAULT_MAX_CELL_LEVEL)
yield s2CellId.parent(index).id()
}

private[this] def checkField(field: String, fieldType: String, row: Row, policy: Option[KeyPolicy.Value], streamFlag: Boolean, isVidStringType: Boolean): Boolean = {
val fieldValue = if (edgeConfig.isGeo && "source_field".equals(fieldType)) {
val lat = row.getDouble(row.schema.fieldIndex(edgeConfig.latitude.get))
val lng = row.getDouble(row.schema.fieldIndex(edgeConfig.longitude.get))
Some(indexCells(lat, lng).mkString(","))
} else {
val index = row.schema.fieldIndex(field)
if (index < 0 || row.isNullAt(index)) {
printChoice(streamFlag, s"$fieldType must exist and cannot be null, your row data is $row")
None
} else Some(row.get(index).toString)
}

val idFlag = fieldValue.isDefined
val policyFlag = if (idFlag && policy.isEmpty && !isVidStringType
&& !NebulaUtils.isNumic(fieldValue.get)) {
printChoice(streamFlag, s"space vidType is int, but your $fieldType $fieldValue is not numeric.your row data is $row")
false
} else if (idFlag && policy.isDefined && isVidStringType) {
printChoice(streamFlag, s"only int vidType can use policy, but your vidType is FIXED_STRING.your row data is $row")
false
} else true
idFlag && policyFlag
}

private[this] def processField(field: String, fieldType: String, row: Row, policy: Option[KeyPolicy.Value], isVidStringType: Boolean): String = {
var fieldValue = if (edgeConfig.isGeo && "source_field".equals(fieldType)) {
val lat = row.getDouble(row.schema.fieldIndex(edgeConfig.latitude.get))
val lng = row.getDouble(row.schema.fieldIndex(edgeConfig.longitude.get))
indexCells(lat, lng).mkString(",")
} else {
val index = row.schema.fieldIndex(field)
val value = row.get(index).toString
if (value.equals(DEFAULT_EMPTY_VALUE)) "" else value
}
// process string type vid
if (policy.isEmpty && isVidStringType) {
fieldValue = NebulaUtils.escapeUtil(fieldValue).mkString("\"", "", "\"")
}
fieldValue
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,21 +5,10 @@

package com.vesoft.nebula.exchange.processor

import com.vesoft.nebula.{
Coordinate,
Date,
DateTime,
Geography,
LineString,
NullType,
Point,
Polygon,
PropertyType,
Time,
Value
}
import com.vesoft.nebula.{Coordinate, Date, DateTime, Geography, LineString, NullType, Point, Polygon, PropertyType, Time, Value}
import com.vesoft.nebula.exchange.utils.NebulaUtils.DEFAULT_EMPTY_VALUE
import com.vesoft.nebula.exchange.utils.{HDFSUtils, NebulaUtils}
import org.apache.log4j.Logger
import org.apache.spark.sql.Row
import org.apache.spark.sql.types.{IntegerType, LongType, StringType}

Expand All @@ -33,6 +22,9 @@ import scala.collection.mutable.ListBuffer
*/
trait Processor extends Serializable {

@transient
private[this] lazy val LOG = Logger.getLogger(this.getClass)

/**
* process dataframe to vertices or edges
*/
Expand Down Expand Up @@ -230,4 +222,9 @@ trait Processor extends Serializable {
}
}
}

def printChoice(streamFlag: Boolean, context: String): Unit = {
if (streamFlag) LOG.info(context)
else assert(assertion = false, context)
}
}
Loading

0 comments on commit d229bee

Please sign in to comment.