Skip to content

Commit

Permalink
add partition config for maxcompute
Browse files Browse the repository at this point in the history
  • Loading branch information
Nicole00 committed Nov 10, 2021
1 parent cb1c22b commit 4e2e491
Show file tree
Hide file tree
Showing 4 changed files with 11 additions and 1 deletion.
2 changes: 2 additions & 0 deletions nebula-exchange/src/main/resources/application.conf
Original file line number Diff line number Diff line change
Expand Up @@ -277,6 +277,8 @@
accessKeyId:xxx
accessKeySecret:xxx
partitionSpec:"dt='partition1'"
# default numPartitions is 1
numPartitions:100
# maxcompute sql sentence only uses table name. make sure that table name is the same with {table}'s value'.
sentence:"select id, maxcompute-field-0, maxcompute-field-1, maxcompute-field-2 from table where id < 10"
fields:[maxcompute-field-0, maxcompute-field-1]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -689,6 +689,11 @@ object Configs {
} else {
null
}
val numPartitions = if (config.hasPath("numPartitions")) {
config.getString("numPartitions")
} else {
"1"
}

val sentence = if (config.hasPath("sentence")) {
config.getString("sentence")
Expand All @@ -705,6 +710,7 @@ object Configs {
config.getString("accessKeyId"),
config.getString("accessKeySecret"),
partitionSpec,
numPartitions,
sentence
)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -231,6 +231,7 @@ case class MaxComputeConfigEntry(override val category: SourceCategory.Value,
accessKeyId: String,
accessKeySecret: String,
partitionSpec: String,
numPartitions: String,
override val sentence: String)
extends ServerDataSourceConfigEntry {
require(
Expand All @@ -240,7 +241,7 @@ case class MaxComputeConfigEntry(override val category: SourceCategory.Value,
override def toString: String = {
s"MaxCompute source {odpsUrl: $odpsUrl, tunnelUrl: $tunnelUrl, table: $table, project: $project, " +
s"keyId: $accessKeyId, keySecret: $accessKeySecret, partitionSpec:$partitionSpec, " +
s"sentence:$sentence}"
s"numPartitions:$numPartitions, sentence:$sentence}"
}

}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -277,6 +277,7 @@ class MaxcomputeReader(override val session: SparkSession, maxComputeConfig: Max
.option("project", maxComputeConfig.project)
.option("accessKeyId", maxComputeConfig.accessKeyId)
.option("accessKeySecret", maxComputeConfig.accessKeySecret)
.option("numPartitions", maxComputeConfig.numPartitions)

// if use partition read
if (maxComputeConfig.partitionSpec != null) {
Expand Down

0 comments on commit 4e2e491

Please sign in to comment.