Skip to content

Commit

Permalink
[GLUTEN-1898][CH] S3 client support per bucket configs and support as… (
Browse files Browse the repository at this point in the history
facebookincubator#1899)

* [GLUTEN-1898][CH] S3 client support per bucket configs and support assume role access

* fix bug

* update clickhouse commit id

* fix stash

* allow endpoint not contain https prefix
  • Loading branch information
binmahone authored Jun 18, 2023
1 parent 0a2931a commit 320ee31
Show file tree
Hide file tree
Showing 7 changed files with 489 additions and 81 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ import java.nio.file.Files

import scala.collection.immutable.Seq

class GlutenFunctionValidateSuit extends WholeStageTransformerSuite {
class GlutenFunctionValidateSuite extends WholeStageTransformerSuite {
override protected val resourcePath: String = {
"../../../../gluten-core/src/test/resources/tpch-data"
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,269 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.glutenproject.s3

import io.glutenproject.GlutenConfig

import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.SparkSession.Builder

import org.apache.hadoop.fs.FileSystem
import org.scalatest.Ignore
import org.scalatest.funsuite.AnyFunSuite

// scalastyle:off
@Ignore
class S3AuthSuite extends AnyFunSuite {
val libPath = "/usr/local/clickhouse/lib/libch.so"

// Throughout this test, the trusted user will visited three buckets:
// 1. a bucket(trustedOwnedBucket) owned by himself.
// 2. a bucket(trustingBucket) owned by a trusting user, but he has access to it by assume a role
// 3. another bucket(trustingBucket2) owned by the same trusting user , but he has access to it by assuming a different role

// for performance reason, we will use the same parquet file in all three buckets. The parquet file is nation table in tpch10,
// which is only 3KB

/** WARNING, the AK/SK should not be committed to github !!!!!!!!!!!!!!! */

// this is the "trusting user part", e.g. data in customer's bucket
val trustingBucket = "mhb-dev-bucket" // in us-west-2, @ global dev
val trustingParquetPath = s"s3a://$trustingBucket/tpch10_nation"
val trustingEndpoint = "s3.us-west-2.amazonaws.com"
val trustingBucket2 = "mhb-dev-bucket-us-east-2" // another bucket in us-west-2, @ global dev
val trustingParquetPath2 = s"s3a://$trustingBucket2/tpch10_nation"
val trustingEndpoint2 = "s3.us-east-2.amazonaws.com"

// this is the "trusted user part", e.g. consumer of customer's data
val trustedAK = "" // @ global prod
val trustedSK = ""
val trustedOwnedBucket = "mhb-prod-bucket" // another bucket in us-west-2
val trustedOwnedParquetPath = s"s3a://$trustedOwnedBucket/tpch10_nation"
val trustedOwnedEndpoint = "s3.us-west-2.amazonaws.com"
val trustedAssumeRole =
"arn:aws:iam::429636537981:role/r_mhb_cross_access_to_prod" // -> has access to parquetPath
val trustedSessionName = "test-session"
val trustedAssumeRole2 =
"arn:aws:iam::429636537981:role/r_mhb_cross_access_to_prod2" // -> has access to parquetPath2
// here external id looks like tested. But it is not. In order to make external id work we must overwrite and
// modify AssumedRoleCredentialProvider, which is also modified in Kyligence/hadoop-aws. To avoid conflicts
// we choose not to overwrite AssumedRoleCredentialProvider in Gluten. So without Kyligence/hadoop-aws,
// external id is NOT supported. So currently trustedAssumeRole2 should NOT have external Id set
val trustedExternalId2 = "123" // it's just a placeholder !!!
val trustedSessionName2 = "test-session-2"

// a separate test case for AWS CN
val cnEndpoint = "s3.cn-northwest-1.amazonaws.com.cn"
val cnAK = ""
val cnSK = ""
val cnParquetPath = "s3a://mhb-cn-private/test_nation"

val longRunningAuthModes = List(
"AKSK" // this requires providing AK/SK of the trusted user
// "INSTANCE_PROFILE", // this requires running on a EC2 instance, with trusted user's instance profile
// "PROFILE_FILE" // this requires ~/.aws/credentials configured with the trusted user.
)
val enableGlutenOrNot = List(true, false)

implicit class ImplicitBuilder(val builder: Builder) {

def withAuthMode(mode: String, assuming: Boolean): Builder = {
val providerKey =
if (assuming) "spark.hadoop.fs.s3a.assumed.role.credentials.provider"
else "spark.hadoop.fs.s3a.aws.credentials.provider";

mode match {
case "AKSK" =>
builder
.config("spark.hadoop.fs.s3a.access.key", trustedAK)
.config("spark.hadoop.fs.s3a.secret.key", trustedSK)
.config(providerKey, "org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider")
case "INSTANCE_PROFILE" =>
builder.config(providerKey, "com.amazonaws.auth.InstanceProfileCredentialsProvider")
case "PROFILE_FILE" =>
builder.config(providerKey, "com.amazonaws.auth.profile.ProfileCredentialsProvider")
case _ =>
}
builder
}

def withGluten(enable: Boolean): Builder = {
builder.config("spark.gluten.enabled", enable.toString)
}
}

for (mode <- longRunningAuthModes; enable <- enableGlutenOrNot) {
test(s"trusted user authed with mode: $mode, gluten enabled: $enable, assuming role: false") {
val spark = SparkSession
.builder()
.appName("Gluten-S3-Test")
.master(s"local[1]")
.config("spark.plugins", "io.glutenproject.GlutenPlugin")
.config(GlutenConfig.GLUTEN_LIB_PATH, libPath)
.config("spark.memory.offHeap.enabled", "true")
.config("spark.memory.offHeap.size", "1g")
.config("spark.gluten.sql.enable.native.validation", "false")
.config("spark.hadoop.fs.s3a.endpoint", trustedOwnedEndpoint)
// The following two configs are provided to help hadoop-aws to pass.
// They're not required by native code (they don't have prefix spark.hadoop so
// native code will not see them)
// They're also unnecessary in real EC2 instance environment (at least it's the case with Kyligence/hadoop-aws)
.config("fs.s3a.assumed.role.sts.endpoint.region", "us-east-1")
.config("fs.s3a.assumed.role.sts.endpoint", "sts.us-east-1.amazonaws.com")
.withAuthMode(mode, assuming = false)
.withGluten(enable)
.enableHiveSupport()
.getOrCreate()

spark.sparkContext.setLogLevel("WARN")
try {
spark.read.parquet(trustedOwnedParquetPath).show(10)
} finally {
spark.close
FileSystem.closeAll()
}
}

test(s"trusted user authed with mode: $mode, gluten enabled: $enable, assuming role: true") {
val spark = SparkSession
.builder()
.appName("Gluten-S3-Test")
.master(s"local[1]")
.config("spark.plugins", "io.glutenproject.GlutenPlugin")
.config(GlutenConfig.GLUTEN_LIB_PATH, libPath)
.config("spark.memory.offHeap.enabled", "true")
.config("spark.memory.offHeap.size", "1g")
.config("spark.gluten.sql.enable.native.validation", "false")
.config("spark.hadoop.fs.s3a.endpoint", trustingEndpoint)
.config(
"spark.hadoop.fs.s3a.aws.credentials.provider",
"org.apache.hadoop.fs.s3a.auth.AssumedRoleCredentialProvider")
.config("spark.hadoop.fs.s3a.assumed.role.arn", trustedAssumeRole)
.config("spark.hadoop.fs.s3a.assumed.role.session.name", trustedSessionName)
// .config("spark.hadoop.fs.s3a.assumed.role.externalId", "") // trustedAssumeRole has no external id
// The following two configs are provided to help hadoop-aws to pass.
// They're not required by native code (they don't have prefix spark.hadoop so
// native code will not see them)
// They're also unnecessary in real EC2 instance environment (at least it's the case with Kyligence/hadoop-aws)
.config("fs.s3a.assumed.role.sts.endpoint.region", "us-east-1")
.config("fs.s3a.assumed.role.sts.endpoint", "sts.us-east-1.amazonaws.com")
.withAuthMode(mode, assuming = true)
.withGluten(enable)
.enableHiveSupport()
.getOrCreate()

spark.sparkContext.setLogLevel("WARN")
try {
spark.read.parquet(trustingParquetPath).show(10)
} finally {
spark.close
FileSystem.closeAll()
}
}

test(
s"trusted user authed with mode: $mode, gluten enabled: $enable, assuming two roles in one session") {
val spark = SparkSession
.builder()
.appName("Gluten-S3-Test")
.master(s"local[1]")
.config("spark.plugins", "io.glutenproject.GlutenPlugin")
.config(GlutenConfig.GLUTEN_LIB_PATH, libPath)
.config("spark.memory.offHeap.enabled", "true")
.config("spark.memory.offHeap.size", "1g")
.config("spark.gluten.sql.enable.native.validation", "false")
.config("spark.hadoop.fs.s3a.endpoint", trustingEndpoint2)
.config(
"spark.hadoop.fs.s3a.aws.credentials.provider",
"org.apache.hadoop.fs.s3a.auth.AssumedRoleCredentialProvider")
.config(s"spark.hadoop.fs.s3a.bucket.$trustingBucket.assumed.role.arn", trustedAssumeRole)
.config(s"spark.hadoop.fs.s3a.bucket.$trustingBucket.assumed.role.externalId", "")
.config(
s"spark.hadoop.fs.s3a.bucket.$trustingBucket.assumed.role.session.name",
trustedSessionName)
.config(s"spark.hadoop.fs.s3a.bucket.$trustingBucket.endpoint", trustingEndpoint)
.config(s"spark.hadoop.fs.s3a.bucket.$trustingBucket2.assumed.role.arn", trustedAssumeRole2)
.config(
s"spark.hadoop.fs.s3a.bucket.$trustingBucket2.assumed.role.session.name",
trustedSessionName2)
.config(
s"spark.hadoop.fs.s3a.bucket.$trustingBucket2.assumed.role.externalId",
trustedExternalId2
)
// The following two configs are provided to help hadoop-aws to pass.
// They're not required by native code (they don't have prefix spark.hadoop so
// native code will not see them)
// They're also unnecessary in real EC2 instance environment (at least it's the case with Kyligence/hadoop-aws)
.config("fs.s3a.assumed.role.sts.endpoint.region", "us-east-1")
.config("fs.s3a.assumed.role.sts.endpoint", "sts.us-east-1.amazonaws.com")
.withAuthMode(mode, assuming = true)
.withGluten(enable)
.enableHiveSupport()
.getOrCreate()

try {
spark.read.parquet(trustingParquetPath).show(10)
spark.read.parquet(trustingParquetPath2).show(10)
} finally {
spark.close
FileSystem.closeAll()
}
}
}

// a special case for CN aws
test("CN: simple ak sk") {
val spark = SparkSession
.builder()
.appName("Gluten-S3-Test")
.master(s"local[1]")
.config("spark.plugins", "io.glutenproject.GlutenPlugin")
.config(GlutenConfig.GLUTEN_LIB_PATH, libPath)
.config("spark.memory.offHeap.enabled", "true")
.config("spark.memory.offHeap.size", "1g")
.config("spark.gluten.sql.enable.native.validation", "false")
.config("spark.hadoop.fs.s3a.endpoint", cnEndpoint)
.config("spark.hadoop.fs.s3a.access.key", cnAK)
.config("spark.hadoop.fs.s3a.secret.key", cnSK)
.config(
"spark.hadoop.fs.s3a.aws.credentials.provider",
"org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider")
.enableHiveSupport()
.getOrCreate()

spark.sparkContext.setLogLevel("WARN")
spark.read.parquet(cnParquetPath).show(10)
spark.close()
}

def close(_spark: SparkSession): Unit = {
try {
if (_spark != null) {
try {
_spark.sessionState.catalog.reset()
} finally {
_spark.stop()
}
}
} finally {
SparkSession.clearActiveSession()
SparkSession.clearDefaultSession()
}
}

}
// scalastyle:on
2 changes: 1 addition & 1 deletion cpp-ch/clickhouse.version
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
CH_ORG=Kyligence
CH_BRANCH=rebase_ch/20230616
CH_COMMIT=f1a0c1c
CH_COMMIT=f1a0c1c
34 changes: 31 additions & 3 deletions cpp-ch/local-engine/Common/CHUtil.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@
#include <boost/algorithm/string/case_conv.hpp>
#include <boost/algorithm/string/predicate.hpp>

#include <regex>
#include "CHUtil.h"

namespace DB
Expand Down Expand Up @@ -471,17 +472,44 @@ void BackendInitializerUtil::initConfig(std::string * plan)
else
config = Poco::AutoPtr(new Poco::Util::MapConfiguration());

/// Apply spark.gluten.sql.columnar.backend.ch.runtime_config.* to config
for (const auto & kv : backend_conf_map)
{
const auto & key = kv.first;
const auto & value = kv.second;
// std::cout << "set config key:" << key << ", value:" << value << std::endl;

if (key.starts_with(CH_RUNTIME_CONFIG_PREFIX) && key != CH_RUNTIME_CONFIG_FILE)
{
/// Apply spark.gluten.sql.columnar.backend.ch.runtime_config.* to config
config->setString(key.substr(CH_RUNTIME_CONFIG_PREFIX.size()), value);
else if (S3_CONFIGS.find(key) != S3_CONFIGS.end())
config->setString(S3_CONFIGS.at(key), value);
}
else if (key.starts_with(SPARK_HADOOP_PREFIX + S3A_PREFIX + "bucket"))
{
// deal with per bucket S3 configs, e.g. fs.s3a.bucket.bucket_name.assumed.role.arn
// for gluten, we require first authenticate with AK/SK(or instance profile), then assume other roles with STS
// so only the following per-bucket configs are supported:
// 1. fs.s3a.bucket.bucket_name.assumed.role.arn
// 2. fs.s3a.bucket.bucket_name.assumed.role.session.name
// 3. fs.s3a.bucket.bucket_name.endpoint
// 4. fs.s3a.bucket.bucket_name.assumed.role.externalId (non hadoop official)

// for spark.hadoop.fs.s3a.bucket.bucket_name.assumed.role.arn, put bucket_name.fs.s3a.assumed.role.arn into config
std::regex base_regex("bucket\\.([^\\.]+)\\.");
std::smatch base_match;
std::string new_key = key.substr(SPARK_HADOOP_PREFIX.length());
if (std::regex_search(new_key, base_match, base_regex))
{
std::string bucket_name = base_match[1].str();
new_key.replace(base_match[0].first - new_key.begin(), base_match[0].second - base_match[0].first, "");
config->setString(bucket_name + "." + new_key, value);
}
}
else if (key.starts_with(SPARK_HADOOP_PREFIX + S3A_PREFIX))
{
// Apply general S3 configs, e.g. spark.hadoop.fs.s3a.access.key -> set in fs.s3a.access.key
config->setString(key.substr(SPARK_HADOOP_PREFIX.length()), value);
}

}
}

Expand Down
40 changes: 23 additions & 17 deletions cpp-ch/local-engine/Common/CHUtil.h
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,29 @@ class BackendInitializerUtil

// use excel text parser
inline static const std::string USE_EXCEL_PARSER = "use_excel_serialization";
inline static const String CH_BACKEND_PREFIX = "spark.gluten.sql.columnar.backend.ch";

inline static const String CH_RUNTIME_CONFIG = "runtime_config";
inline static const String CH_RUNTIME_CONFIG_PREFIX = CH_BACKEND_PREFIX + "." + CH_RUNTIME_CONFIG + ".";
inline static const String CH_RUNTIME_CONFIG_FILE = CH_RUNTIME_CONFIG_PREFIX + "config_file";

inline static const String CH_RUNTIME_SETTINGS = "runtime_settings";
inline static const String CH_RUNTIME_SETTINGS_PREFIX = CH_BACKEND_PREFIX + "." + CH_RUNTIME_SETTINGS + ".";

inline static const String LIBHDFS3_CONF_KEY = "hdfs.libhdfs3_conf";
inline static const String SETTINGs_PATH = "local_engine.settings";
inline static const std::string HADOOP_S3_ACCESS_KEY = "fs.s3a.access.key";
inline static const std::string HADOOP_S3_SECRET_KEY = "fs.s3a.secret.key";
inline static const std::string HADOOP_S3_ENDPOINT = "fs.s3a.endpoint";
inline static const std::string HADOOP_S3_ASSUMED_ROLE = "fs.s3a.assumed.role.arn";
inline static const std::string HADOOP_S3_ASSUMED_SESSION_NAME = "fs.s3a.assumed.role.session.name";
// not hadoop official
inline static const std::string HADOOP_S3_ASSUMED_EXTERNAL_ID = "fs.s3a.assumed.role.externalId";
// hadoop official, this is used to ignore the cached client
inline static const std::string HADOOP_S3_CLIENT_CACHE_IGNORE = "fs.s3a.client.cached.ignore";
inline static const std::string SPARK_HADOOP_PREFIX = "spark.hadoop.";
inline static const std::string S3A_PREFIX = "fs.s3a.";

private:
friend class BackendFinalizerUtil;
friend class JNIUtils;
Expand All @@ -115,23 +138,6 @@ class BackendInitializerUtil

static std::map<std::string, std::string> getBackendConfMap(const std::string & plan);

inline static const String CH_BACKEND_PREFIX = "spark.gluten.sql.columnar.backend.ch";

inline static const String CH_RUNTIME_CONFIG = "runtime_config";
inline static const String CH_RUNTIME_CONFIG_PREFIX = CH_BACKEND_PREFIX + "." + CH_RUNTIME_CONFIG + ".";
inline static const String CH_RUNTIME_CONFIG_FILE = CH_RUNTIME_CONFIG_PREFIX + "config_file";

inline static const String CH_RUNTIME_SETTINGS = "runtime_settings";
inline static const String CH_RUNTIME_SETTINGS_PREFIX = CH_BACKEND_PREFIX + "." + CH_RUNTIME_SETTINGS + ".";

inline static const String LIBHDFS3_CONF_KEY = "hdfs.libhdfs3_conf";
inline static const String SETTINGs_PATH = "local_engine.settings";
inline static const std::string SPARK_S3_ACCESS_KEY = "spark.hadoop.fs.s3a.access.key";
inline static const std::string SPARK_S3_SECRET_KEY = "spark.hadoop.fs.s3a.secret.key";
inline static const std::string SPARK_S3_ENDPOINT = "spark.hadoop.fs.s3a.endpoint";
inline static const std::map<std::string, std::string> S3_CONFIGS
= {{SPARK_S3_ACCESS_KEY, "s3.access_key_id"}, {SPARK_S3_SECRET_KEY, "s3.secret_access_key"}, {SPARK_S3_ENDPOINT, "s3.endpoint"}};

inline static std::once_flag init_flag;
inline static std::map<std::string, std::string> backend_conf_map;
inline static DB::Context::ConfigurationPtr config;
Expand Down
Loading

0 comments on commit 320ee31

Please sign in to comment.