Skip to content

Commit

Permalink
kafka-value-parser (#29)
Browse files Browse the repository at this point in the history
  • Loading branch information
sworduo authored Nov 25, 2021
1 parent c05dbf7 commit 9c79a1b
Show file tree
Hide file tree
Showing 5 changed files with 65 additions and 30 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -46,8 +46,14 @@ object ErrorHandler {
*/
def save(buffer: ArrayBuffer[String], path: String): Unit = {
LOG.info(s"create reload path $path")
val fileSystem = FileSystem.get(new Configuration())
val errors = fileSystem.create(new Path(path))
val fileSystem = FileSystem.get(new Configuration())
val targetPath = new Path(path)
val errors = if (fileSystem.exists(targetPath)) {
// For kafka, the error ngql need to append to a same file instead of overwrite
fileSystem.append(targetPath)
} else {
fileSystem.create(targetPath)
}

try {
for (error <- buffer) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,8 @@ object Exchange {
val nebulaKeys = tagConfig.nebulaFields
LOG.info(s"nebula keys: ${nebulaKeys.mkString(", ")}")

val data = createDataSource(spark, tagConfig.dataSourceConfigEntry)
val fields = tagConfig.vertexField::tagConfig.fields
val data = createDataSource(spark, tagConfig.dataSourceConfigEntry, fields)
if (data.isDefined && !c.dry) {
val startTime = System.currentTimeMillis()
val batchSuccess =
Expand Down Expand Up @@ -172,7 +173,12 @@ object Exchange {
LOG.info(s"field keys: ${fieldKeys.mkString(", ")}")
val nebulaKeys = edgeConfig.nebulaFields
LOG.info(s"nebula keys: ${nebulaKeys.mkString(", ")}")
val data = createDataSource(spark, edgeConfig.dataSourceConfigEntry)
val fields = if (edgeConfig.rankingField.isDefined) {
edgeConfig.rankingField.get::edgeConfig.sourceField::edgeConfig.targetField::edgeConfig.fields
} else {
edgeConfig.sourceField::edgeConfig.targetField::edgeConfig.fields
}
val data = createDataSource(spark, edgeConfig.dataSourceConfigEntry, fields)
if (data.isDefined && !c.dry) {
val startTime = System.currentTimeMillis()
val batchSuccess = spark.sparkContext.longAccumulator(s"batchSuccess.${edgeConfig.name}")
Expand Down Expand Up @@ -228,7 +234,8 @@ object Exchange {
*/
private[this] def createDataSource(
session: SparkSession,
config: DataSourceConfigEntry
config: DataSourceConfigEntry,
fields: List[String]
): Option[DataFrame] = {
config.category match {
case SourceCategory.PARQUET =>
Expand Down Expand Up @@ -260,7 +267,7 @@ object Exchange {
case SourceCategory.KAFKA => {
val kafkaConfig = config.asInstanceOf[KafkaSourceConfigEntry]
LOG.info(s"""Loading from Kafka ${kafkaConfig.server} and subscribe ${kafkaConfig.topic}""")
val reader = new KafkaReader(session, kafkaConfig)
val reader = new KafkaReader(session, kafkaConfig, fields)
Some(reader.read())
}
case SourceCategory.NEO4J =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -340,10 +340,15 @@ object Configs {
hiveEntryOpt = Option(hiveEntry)
}

var hasKafka = false

val tags = mutable.ListBuffer[TagConfigEntry]()
val tagConfigs = getConfigsOrNone(config, "tags")
if (tagConfigs.isDefined) {
for (tagConfig <- tagConfigs.get.asScala) {
if (hasKafka) {
throw new IllegalArgumentException("Can not define any other configs when kafka exists")
}
if (!tagConfig.hasPath("name") ||
!tagConfig.hasPath("type.source") ||
!tagConfig.hasPath("type.sink")) {
Expand Down Expand Up @@ -377,6 +382,7 @@ object Configs {
val sourceCategory = toSourceCategory(tagConfig.getString("type.source"))
val sourceConfig = dataSourceConfig(sourceCategory, tagConfig, nebulaConfig)
LOG.info(s"Source Config ${sourceConfig}")
hasKafka = sourceCategory == SourceCategory.KAFKA

val sinkCategory = toSinkCategory(tagConfig.getString("type.sink"))
val sinkConfig = dataSinkConfig(sinkCategory, nebulaConfig)
Expand Down Expand Up @@ -412,6 +418,9 @@ object Configs {
val edgeConfigs = getConfigsOrNone(config, "edges")
if (edgeConfigs.isDefined) {
for (edgeConfig <- edgeConfigs.get.asScala) {
if (hasKafka) {
throw new IllegalArgumentException("Can not define any other configs when kafka exists")
}
if (!edgeConfig.hasPath("name") ||
!edgeConfig.hasPath("type.source") ||
!edgeConfig.hasPath("type.sink")) {
Expand All @@ -434,6 +443,7 @@ object Configs {
val sourceCategory = toSourceCategory(edgeConfig.getString("type.source"))
val sourceConfig = dataSourceConfig(sourceCategory, edgeConfig, nebulaConfig)
LOG.info(s"Source Config ${sourceConfig}")
hasKafka = sourceCategory == SourceCategory.KAFKA

val sinkCategory = toSinkCategory(edgeConfig.getString("type.sink"))
val sinkConfig = dataSinkConfig(sinkCategory, nebulaConfig)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,10 @@
package com.vesoft.nebula.exchange.reader

import com.vesoft.nebula.exchange.config.{KafkaSourceConfigEntry, PulsarSourceConfigEntry}
import org.apache.spark.sql.{DataFrame, SparkSession}
import org.apache.spark.sql.{DataFrame, SparkSession, Row, Encoders}
import com.alibaba.fastjson.{JSON, JSONObject}
import org.apache.spark.sql.types.{StructField, StructType, StringType}
import org.apache.spark.sql.functions.{from_json,col}

/**
* Spark Streaming
Expand All @@ -25,17 +28,26 @@ abstract class StreamingBaseReader(override val session: SparkSession) extends R
* @param session
* @param kafkaConfig
*/
class KafkaReader(override val session: SparkSession, kafkaConfig: KafkaSourceConfigEntry)
class KafkaReader(override val session: SparkSession,
kafkaConfig: KafkaSourceConfigEntry,
targetFields: List[String])
extends StreamingBaseReader(session) {

require(kafkaConfig.server.trim.nonEmpty && kafkaConfig.topic.trim.nonEmpty)
require(kafkaConfig.server.trim.nonEmpty && kafkaConfig.topic.trim.nonEmpty && targetFields.nonEmpty)

override def read(): DataFrame = {
import session.implicits._
val fields = targetFields.distinct
val jsonSchema = StructType(fields.map(field => StructField(field, StringType, true)))
session.readStream
.format("kafka")
.option("kafka.bootstrap.servers", kafkaConfig.server)
.option("subscribe", kafkaConfig.topic)
.load()
.format("kafka")
.option("kafka.bootstrap.servers", kafkaConfig.server)
.option("subscribe", kafkaConfig.topic)
.load()
.selectExpr("CAST(value AS STRING)")
.as[(String)]
.withColumn("value", from_json(col("value"), jsonSchema))
.select("value.*")
}
}

Expand Down
34 changes: 17 additions & 17 deletions nebula-exchange/src/test/resources/application.conf
Original file line number Diff line number Diff line change
Expand Up @@ -224,23 +224,23 @@
}

# KAFKA
{
name: tag7
type: {
source: kafka
sink: client
}
service: "kafka.service.address"
topic: "topic-name"
fields: [kafka-field-0, kafka-field-1, kafka-field-2]
nebula.fields: [nebula-field-0, nebula-field-1, nebula-field-2]
vertex: {
field: kafka-field-0
}
partition: 10
batch: 10
interval.seconds: 10
}
# {
# name: tag7
# type: {
# source: kafka
# sink: client
# }
# service: "kafka.service.address"
# topic: "topic-name"
# fields: [kafka-field-0, kafka-field-1, kafka-field-2]
# nebula.fields: [nebula-field-0, nebula-field-1, nebula-field-2]
# vertex: {
# field: kafka-field-0
# }
# partition: 10
# batch: 10
# interval.seconds: 10
# }

# MySql
{
Expand Down

0 comments on commit 9c79a1b

Please sign in to comment.