diff --git a/nebula-exchange/src/main/resources/application.conf b/nebula-exchange/src/main/resources/application.conf index 8b219468..00fe83f7 100644 --- a/nebula-exchange/src/main/resources/application.conf +++ b/nebula-exchange/src/main/resources/application.conf @@ -277,6 +277,8 @@ accessKeyId:xxx accessKeySecret:xxx partitionSpec:"dt='partition1'" + # default numPartitions is 1 + numPartitions:100 # maxcompute sql sentence only uses table name. make sure that table name is the same with {table}'s value'. sentence:"select id, maxcompute-field-0, maxcompute-field-1, maxcompute-field-2 from table where id < 10" fields:[maxcompute-field-0, maxcompute-field-1] diff --git a/nebula-exchange/src/main/scala/com/vesoft/nebula/exchange/config/Configs.scala b/nebula-exchange/src/main/scala/com/vesoft/nebula/exchange/config/Configs.scala index c1f47913..1735ade6 100644 --- a/nebula-exchange/src/main/scala/com/vesoft/nebula/exchange/config/Configs.scala +++ b/nebula-exchange/src/main/scala/com/vesoft/nebula/exchange/config/Configs.scala @@ -688,6 +688,11 @@ object Configs { } else { null } + val numPartitions = if (config.hasPath("numPartitions")) { + config.getString("numPartitions") + } else { + "1" + } val sentence = if (config.hasPath("sentence")) { config.getString("sentence") @@ -704,6 +709,7 @@ object Configs { config.getString("accessKeyId"), config.getString("accessKeySecret"), partitionSpec, + numPartitions, sentence ) } diff --git a/nebula-exchange/src/main/scala/com/vesoft/nebula/exchange/config/SourceConfigs.scala b/nebula-exchange/src/main/scala/com/vesoft/nebula/exchange/config/SourceConfigs.scala index f87e22a1..44a296cb 100644 --- a/nebula-exchange/src/main/scala/com/vesoft/nebula/exchange/config/SourceConfigs.scala +++ b/nebula-exchange/src/main/scala/com/vesoft/nebula/exchange/config/SourceConfigs.scala @@ -230,6 +230,7 @@ case class MaxComputeConfigEntry(override val category: SourceCategory.Value, accessKeyId: String, accessKeySecret: String, partitionSpec: String, + numPartitions: String, override val sentence: String) extends ServerDataSourceConfigEntry { require( @@ -239,7 +240,7 @@ case class MaxComputeConfigEntry(override val category: SourceCategory.Value, override def toString: String = { s"MaxCompute source {odpsUrl: $odpsUrl, tunnelUrl: $tunnelUrl, table: $table, project: $project, " + s"keyId: $accessKeyId, keySecret: $accessKeySecret, partitionSpec:$partitionSpec, " + - s"sentence:$sentence}" + s"numPartitions:$numPartitions, sentence:$sentence}" } } diff --git a/nebula-exchange/src/main/scala/com/vesoft/nebula/exchange/reader/ServerBaseReader.scala b/nebula-exchange/src/main/scala/com/vesoft/nebula/exchange/reader/ServerBaseReader.scala index 4174ae8f..c32ae00b 100644 --- a/nebula-exchange/src/main/scala/com/vesoft/nebula/exchange/reader/ServerBaseReader.scala +++ b/nebula-exchange/src/main/scala/com/vesoft/nebula/exchange/reader/ServerBaseReader.scala @@ -276,6 +276,7 @@ class MaxcomputeReader(override val session: SparkSession, maxComputeConfig: Max .option("project", maxComputeConfig.project) .option("accessKeyId", maxComputeConfig.accessKeyId) .option("accessKeySecret", maxComputeConfig.accessKeySecret) + .option("numPartitions", maxComputeConfig.numPartitions) // if use partition read if (maxComputeConfig.partitionSpec != null) {