From 393136f797eefeb8b09c2eeb7414fb242f69d6ab Mon Sep 17 00:00:00 2001 From: luoshangjun Date: Tue, 23 Nov 2021 12:02:38 +0800 Subject: [PATCH] kafka-value-parser --- .../vesoft/nebula/exchange/ErrorHandler.scala | 10 ++++-- .../com/vesoft/nebula/exchange/Exchange.scala | 15 +++++--- .../nebula/exchange/config/Configs.scala | 10 ++++++ .../exchange/reader/StreamingBaseReader.scala | 26 ++++++++++---- .../src/test/resources/application.conf | 34 +++++++++---------- 5 files changed, 65 insertions(+), 30 deletions(-) diff --git a/nebula-exchange/src/main/scala/com/vesoft/nebula/exchange/ErrorHandler.scala b/nebula-exchange/src/main/scala/com/vesoft/nebula/exchange/ErrorHandler.scala index 267af401..96a76b8e 100644 --- a/nebula-exchange/src/main/scala/com/vesoft/nebula/exchange/ErrorHandler.scala +++ b/nebula-exchange/src/main/scala/com/vesoft/nebula/exchange/ErrorHandler.scala @@ -46,8 +46,14 @@ object ErrorHandler { */ def save(buffer: ArrayBuffer[String], path: String): Unit = { LOG.info(s"create reload path $path") - val fileSystem = FileSystem.get(new Configuration()) - val errors = fileSystem.create(new Path(path)) + val fileSystem = FileSystem.get(new Configuration()) + val targetPath = new Path(path) + val errors = if (fileSystem.exists(targetPath)) { + // For kafka, the error ngql need to append to a same file instead of overwrite + fileSystem.append(targetPath) + } else { + fileSystem.create(targetPath) + } try { for (error <- buffer) { diff --git a/nebula-exchange/src/main/scala/com/vesoft/nebula/exchange/Exchange.scala b/nebula-exchange/src/main/scala/com/vesoft/nebula/exchange/Exchange.scala index 694644b2..27f75101 100644 --- a/nebula-exchange/src/main/scala/com/vesoft/nebula/exchange/Exchange.scala +++ b/nebula-exchange/src/main/scala/com/vesoft/nebula/exchange/Exchange.scala @@ -131,7 +131,8 @@ object Exchange { val nebulaKeys = tagConfig.nebulaFields LOG.info(s"nebula keys: ${nebulaKeys.mkString(", ")}") - val data = createDataSource(spark, tagConfig.dataSourceConfigEntry) + val fields = tagConfig.vertexField::tagConfig.fields + val data = createDataSource(spark, tagConfig.dataSourceConfigEntry, fields) if (data.isDefined && !c.dry) { val startTime = System.currentTimeMillis() val batchSuccess = @@ -172,7 +173,12 @@ object Exchange { LOG.info(s"field keys: ${fieldKeys.mkString(", ")}") val nebulaKeys = edgeConfig.nebulaFields LOG.info(s"nebula keys: ${nebulaKeys.mkString(", ")}") - val data = createDataSource(spark, edgeConfig.dataSourceConfigEntry) + val fields = if (edgeConfig.rankingField.isDefined) { + edgeConfig.rankingField.get::edgeConfig.sourceField::edgeConfig.targetField::edgeConfig.fields + } else { + edgeConfig.sourceField::edgeConfig.targetField::edgeConfig.fields + } + val data = createDataSource(spark, edgeConfig.dataSourceConfigEntry, fields) if (data.isDefined && !c.dry) { val startTime = System.currentTimeMillis() val batchSuccess = spark.sparkContext.longAccumulator(s"batchSuccess.${edgeConfig.name}") @@ -228,7 +234,8 @@ object Exchange { */ private[this] def createDataSource( session: SparkSession, - config: DataSourceConfigEntry + config: DataSourceConfigEntry, + fields: List[String] ): Option[DataFrame] = { config.category match { case SourceCategory.PARQUET => @@ -260,7 +267,7 @@ object Exchange { case SourceCategory.KAFKA => { val kafkaConfig = config.asInstanceOf[KafkaSourceConfigEntry] LOG.info(s"""Loading from Kafka ${kafkaConfig.server} and subscribe ${kafkaConfig.topic}""") - val reader = new KafkaReader(session, kafkaConfig) + val reader = new KafkaReader(session, kafkaConfig, fields) Some(reader.read()) } case SourceCategory.NEO4J => diff --git a/nebula-exchange/src/main/scala/com/vesoft/nebula/exchange/config/Configs.scala b/nebula-exchange/src/main/scala/com/vesoft/nebula/exchange/config/Configs.scala index 1735ade6..0dc859e1 100644 --- a/nebula-exchange/src/main/scala/com/vesoft/nebula/exchange/config/Configs.scala +++ b/nebula-exchange/src/main/scala/com/vesoft/nebula/exchange/config/Configs.scala @@ -338,10 +338,15 @@ object Configs { hiveEntryOpt = Option(hiveEntry) } + var hasKafka = false + val tags = mutable.ListBuffer[TagConfigEntry]() val tagConfigs = getConfigsOrNone(config, "tags") if (tagConfigs.isDefined) { for (tagConfig <- tagConfigs.get.asScala) { + if (hasKafka) { + throw new IllegalArgumentException("Can not define any other configs when kafka exists") + } if (!tagConfig.hasPath("name") || !tagConfig.hasPath("type.source") || !tagConfig.hasPath("type.sink")) { @@ -375,6 +380,7 @@ object Configs { val sourceCategory = toSourceCategory(tagConfig.getString("type.source")) val sourceConfig = dataSourceConfig(sourceCategory, tagConfig, nebulaConfig) LOG.info(s"Source Config ${sourceConfig}") + hasKafka = sourceCategory == SourceCategory.KAFKA val sinkCategory = toSinkCategory(tagConfig.getString("type.sink")) val sinkConfig = dataSinkConfig(sinkCategory, nebulaConfig) @@ -410,6 +416,9 @@ object Configs { val edgeConfigs = getConfigsOrNone(config, "edges") if (edgeConfigs.isDefined) { for (edgeConfig <- edgeConfigs.get.asScala) { + if (hasKafka) { + throw new IllegalArgumentException("Can not define any other configs when kafka exists") + } if (!edgeConfig.hasPath("name") || !edgeConfig.hasPath("type.source") || !edgeConfig.hasPath("type.sink")) { @@ -432,6 +441,7 @@ object Configs { val sourceCategory = toSourceCategory(edgeConfig.getString("type.source")) val sourceConfig = dataSourceConfig(sourceCategory, edgeConfig, nebulaConfig) LOG.info(s"Source Config ${sourceConfig}") + hasKafka = sourceCategory == SourceCategory.KAFKA val sinkCategory = toSinkCategory(edgeConfig.getString("type.sink")) val sinkConfig = dataSinkConfig(sinkCategory, nebulaConfig) diff --git a/nebula-exchange/src/main/scala/com/vesoft/nebula/exchange/reader/StreamingBaseReader.scala b/nebula-exchange/src/main/scala/com/vesoft/nebula/exchange/reader/StreamingBaseReader.scala index 7e8cfaa1..332060ab 100644 --- a/nebula-exchange/src/main/scala/com/vesoft/nebula/exchange/reader/StreamingBaseReader.scala +++ b/nebula-exchange/src/main/scala/com/vesoft/nebula/exchange/reader/StreamingBaseReader.scala @@ -6,7 +6,10 @@ package com.vesoft.nebula.exchange.reader import com.vesoft.nebula.exchange.config.{KafkaSourceConfigEntry, PulsarSourceConfigEntry} -import org.apache.spark.sql.{DataFrame, SparkSession} +import org.apache.spark.sql.{DataFrame, SparkSession, Row, Encoders} +import com.alibaba.fastjson.{JSON, JSONObject} +import org.apache.spark.sql.types.{StructField, StructType, StringType} +import org.apache.spark.sql.functions.{from_json,col} /** * Spark Streaming @@ -25,17 +28,26 @@ abstract class StreamingBaseReader(override val session: SparkSession) extends R * @param session * @param kafkaConfig */ -class KafkaReader(override val session: SparkSession, kafkaConfig: KafkaSourceConfigEntry) +class KafkaReader(override val session: SparkSession, + kafkaConfig: KafkaSourceConfigEntry, + targetFields: List[String]) extends StreamingBaseReader(session) { - require(kafkaConfig.server.trim.nonEmpty && kafkaConfig.topic.trim.nonEmpty) + require(kafkaConfig.server.trim.nonEmpty && kafkaConfig.topic.trim.nonEmpty && targetFields.nonEmpty) override def read(): DataFrame = { + import session.implicits._ + val fields = targetFields.distinct + val jsonSchema = StructType(fields.map(field => StructField(field, StringType, true))) session.readStream - .format("kafka") - .option("kafka.bootstrap.servers", kafkaConfig.server) - .option("subscribe", kafkaConfig.topic) - .load() + .format("kafka") + .option("kafka.bootstrap.servers", kafkaConfig.server) + .option("subscribe", kafkaConfig.topic) + .load() + .selectExpr("CAST(value AS STRING)") + .as[(String)] + .withColumn("value", from_json(col("value"), jsonSchema)) + .select("value.*") } } diff --git a/nebula-exchange/src/test/resources/application.conf b/nebula-exchange/src/test/resources/application.conf index 29649676..447d267a 100644 --- a/nebula-exchange/src/test/resources/application.conf +++ b/nebula-exchange/src/test/resources/application.conf @@ -224,23 +224,23 @@ } # KAFKA - { - name: tag7 - type: { - source: kafka - sink: client - } - service: "kafka.service.address" - topic: "topic-name" - fields: [kafka-field-0, kafka-field-1, kafka-field-2] - nebula.fields: [nebula-field-0, nebula-field-1, nebula-field-2] - vertex: { - field: kafka-field-0 - } - partition: 10 - batch: 10 - interval.seconds: 10 - } + # { + # name: tag7 + # type: { + # source: kafka + # sink: client + # } + # service: "kafka.service.address" + # topic: "topic-name" + # fields: [kafka-field-0, kafka-field-1, kafka-field-2] + # nebula.fields: [nebula-field-0, nebula-field-1, nebula-field-2] + # vertex: { + # field: kafka-field-0 + # } + # partition: 10 + # batch: 10 + # interval.seconds: 10 + # } # MySql {