Skip to content

Commit

Permalink
Refactor HiveConfig (#7725)
Browse files Browse the repository at this point in the history
Summary:
To solve issue: #7659

Pull Request resolved: #7725

Reviewed By: xiaoxmeng

Differential Revision: D51814038

Pulled By: kewang1024

fbshipit-source-id: d9cf4e19ec0e6188caf61ec0097272e0496bf4a5
  • Loading branch information
kewang1024 authored and facebook-github-bot committed Dec 11, 2023
1 parent 023191c commit eb75367
Show file tree
Hide file tree
Showing 39 changed files with 588 additions and 364 deletions.
25 changes: 10 additions & 15 deletions velox/connectors/Connector.h
Original file line number Diff line number Diff line change
Expand Up @@ -237,7 +237,7 @@ class ConnectorQueryCtx {
ConnectorQueryCtx(
memory::MemoryPool* operatorPool,
memory::MemoryPool* connectorPool,
const Config* connectorConfig,
const Config* sessionProperties,
const common::SpillConfig* spillConfig,
std::unique_ptr<core::ExpressionEvaluator> expressionEvaluator,
cache::AsyncDataCache* cache,
Expand All @@ -247,7 +247,7 @@ class ConnectorQueryCtx {
int driverId)
: operatorPool_(operatorPool),
connectorPool_(connectorPool),
config_(connectorConfig),
sessionProperties_(sessionProperties),
spillConfig_(spillConfig),
expressionEvaluator_(std::move(expressionEvaluator)),
cache_(cache),
Expand All @@ -256,7 +256,7 @@ class ConnectorQueryCtx {
taskId_(taskId),
driverId_(driverId),
planNodeId_(planNodeId) {
VELOX_CHECK_NOT_NULL(connectorConfig);
VELOX_CHECK_NOT_NULL(sessionProperties);
}

/// Returns the associated operator's memory pool which is a leaf kind of
Expand All @@ -272,8 +272,8 @@ class ConnectorQueryCtx {
return connectorPool_;
}

const Config* config() const {
return config_;
const Config* sessionProperties() const {
return sessionProperties_;
}

const common::SpillConfig* spillConfig() const {
Expand Down Expand Up @@ -315,7 +315,7 @@ class ConnectorQueryCtx {
private:
memory::MemoryPool* const operatorPool_;
memory::MemoryPool* const connectorPool_;
const Config* config_;
const Config* const sessionProperties_;
const common::SpillConfig* const spillConfig_;
std::unique_ptr<core::ExpressionEvaluator> expressionEvaluator_;
cache::AsyncDataCache* cache_;
Expand All @@ -328,19 +328,16 @@ class ConnectorQueryCtx {

class Connector {
public:
explicit Connector(
const std::string& id,
std::shared_ptr<const Config> properties)
: id_(id), properties_(std::move(properties)) {}
explicit Connector(const std::string& id) : id_(id) {}

virtual ~Connector() = default;

const std::string& connectorId() const {
return id_;
}

const std::shared_ptr<const Config>& connectorProperties() const {
return properties_;
virtual const std::shared_ptr<const Config>& connectorConfig() const {
VELOX_NYI("connectorConfig is not supported yet");
}

// Returns true if this connector would accept a filter dynamically generated
Expand Down Expand Up @@ -391,8 +388,6 @@ class Connector {
static folly::Synchronized<
std::unordered_map<std::string_view, std::weak_ptr<cache::ScanTracker>>>
trackers_;

const std::shared_ptr<const Config> properties_;
};

class ConnectorFactory {
Expand All @@ -410,7 +405,7 @@ class ConnectorFactory {

virtual std::shared_ptr<Connector> newConnector(
const std::string& id,
std::shared_ptr<const Config> properties,
std::shared_ptr<const Config> config,
folly::Executor* FOLLY_NULLABLE executor = nullptr) = 0;

private:
Expand Down
8 changes: 4 additions & 4 deletions velox/connectors/fuzzer/FuzzerConnector.h
Original file line number Diff line number Diff line change
Expand Up @@ -103,9 +103,9 @@ class FuzzerConnector final : public Connector {
public:
FuzzerConnector(
const std::string& id,
std::shared_ptr<const Config> properties,
std::shared_ptr<const Config> config,
folly::Executor* FOLLY_NULLABLE /*executor*/)
: Connector(id, properties) {}
: Connector(id) {}

std::unique_ptr<DataSource> createDataSource(
const std::shared_ptr<const RowType>& outputType,
Expand Down Expand Up @@ -139,9 +139,9 @@ class FuzzerConnectorFactory : public ConnectorFactory {

std::shared_ptr<Connector> newConnector(
const std::string& id,
std::shared_ptr<const Config> properties,
std::shared_ptr<const Config> config,
folly::Executor* FOLLY_NULLABLE executor = nullptr) override {
return std::make_shared<FuzzerConnector>(id, properties, executor);
return std::make_shared<FuzzerConnector>(id, config, executor);
}
};

Expand Down
199 changes: 92 additions & 107 deletions velox/connectors/hive/HiveConfig.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -39,16 +39,6 @@ stringToInsertExistingPartitionsBehavior(const std::string& strValue) {

} // namespace

// static
HiveConfig::InsertExistingPartitionsBehavior
HiveConfig::insertExistingPartitionsBehavior(const Config* config) {
const auto behavior =
config->get<std::string>(kInsertExistingPartitionsBehavior);
return behavior.has_value()
? stringToInsertExistingPartitionsBehavior(behavior.value())
: InsertExistingPartitionsBehavior::kError;
}

// static
std::string HiveConfig::insertExistingPartitionsBehaviorString(
InsertExistingPartitionsBehavior behavior) {
Expand All @@ -62,166 +52,161 @@ std::string HiveConfig::insertExistingPartitionsBehaviorString(
}
}

// static
uint32_t HiveConfig::maxPartitionsPerWriters(const Config* config) {
return config->get<uint32_t>(kMaxPartitionsPerWriters, 100);
HiveConfig::InsertExistingPartitionsBehavior
HiveConfig::insertExistingPartitionsBehavior() const {
const auto behavior =
config_->get<std::string>(kInsertExistingPartitionsBehavior);
return behavior.has_value()
? stringToInsertExistingPartitionsBehavior(behavior.value())
: InsertExistingPartitionsBehavior::kError;
}

// static
bool HiveConfig::immutablePartitions(const Config* config) {
return config->get<bool>(kImmutablePartitions, false);
uint32_t HiveConfig::maxPartitionsPerWriters(const Config* session) const {
if (session->isValueExists(kMaxPartitionsPerWritersSession)) {
return session->get<uint32_t>(kMaxPartitionsPerWritersSession).value();
}
return config_->get<uint32_t>(kMaxPartitionsPerWriters, 100);
}

// static
bool HiveConfig::s3UseVirtualAddressing(const Config* config) {
return !config->get(kS3PathStyleAccess, false);
bool HiveConfig::immutablePartitions() const {
return config_->get<bool>(kImmutablePartitions, false);
}

// static
std::string HiveConfig::s3GetLogLevel(const Config* config) {
return config->get(kS3LogLevel, std::string("FATAL"));
bool HiveConfig::s3UseVirtualAddressing() const {
return !config_->get(kS3PathStyleAccess, false);
}

// static
bool HiveConfig::s3UseSSL(const Config* config) {
return config->get(kS3SSLEnabled, true);
std::string HiveConfig::s3GetLogLevel() const {
return config_->get(kS3LogLevel, std::string("FATAL"));
}

// static
bool HiveConfig::s3UseInstanceCredentials(const Config* config) {
return config->get(kS3UseInstanceCredentials, false);
bool HiveConfig::s3UseSSL() const {
return config_->get(kS3SSLEnabled, true);
}

// static
std::string HiveConfig::s3Endpoint(const Config* config) {
return config->get(kS3Endpoint, std::string(""));
bool HiveConfig::s3UseInstanceCredentials() const {
return config_->get(kS3UseInstanceCredentials, false);
}

// static
std::optional<std::string> HiveConfig::s3AccessKey(const Config* config) {
if (config->isValueExists(kS3AwsAccessKey)) {
return config->get(kS3AwsAccessKey).value();
}
return {};
std::string HiveConfig::s3Endpoint() const {
return config_->get(kS3Endpoint, std::string(""));
}

// static
std::optional<std::string> HiveConfig::s3SecretKey(const Config* config) {
if (config->isValueExists(kS3AwsSecretKey)) {
return config->get(kS3AwsSecretKey).value();
std::optional<std::string> HiveConfig::s3AccessKey() const {
if (config_->isValueExists(kS3AwsAccessKey)) {
return config_->get(kS3AwsAccessKey).value();
}
return {};
}

// static
std::optional<std::string> HiveConfig::s3IAMRole(const Config* config) {
if (config->isValueExists(kS3IamRole)) {
return config->get(kS3IamRole).value();
std::optional<std::string> HiveConfig::s3SecretKey() const {
if (config_->isValueExists(kS3AwsSecretKey)) {
return config_->get(kS3AwsSecretKey).value();
}
return {};
}

// static
std::string HiveConfig::s3IAMRoleSessionName(const Config* config) {
return config->get(kS3IamRoleSessionName, std::string("velox-session"));
}

// static
std::string HiveConfig::gcsEndpoint(const Config* config) {
return config->get<std::string>(kGCSEndpoint, std::string(""));
std::optional<std::string> HiveConfig::s3IAMRole() const {
if (config_->isValueExists(kS3IamRole)) {
return config_->get(kS3IamRole).value();
}
return {};
}

// static
std::string HiveConfig::gcsScheme(const Config* config) {
return config->get<std::string>(kGCSScheme, std::string("https"));
std::string HiveConfig::s3IAMRoleSessionName() const {
return config_->get(kS3IamRoleSessionName, std::string("velox-session"));
}

// static
std::string HiveConfig::gcsCredentials(const Config* config) {
return config->get<std::string>(kGCSCredentials, std::string(""));
std::string HiveConfig::gcsEndpoint() const {
return config_->get<std::string>(kGCSEndpoint, std::string(""));
}

// static.
bool HiveConfig::isOrcUseColumnNames(const Config* config) {
return config->get<bool>(kOrcUseColumnNames, false);
std::string HiveConfig::gcsScheme() const {
return config_->get<std::string>(kGCSScheme, std::string("https"));
}

// static.
bool HiveConfig::isFileColumnNamesReadAsLowerCase(const Config* config) {
return config->get<bool>(kFileColumnNamesReadAsLowerCase, false);
std::string HiveConfig::gcsCredentials() const {
return config_->get<std::string>(kGCSCredentials, std::string(""));
}

// static.
int64_t HiveConfig::maxCoalescedBytes(const Config* config) {
return config->get<int64_t>(kMaxCoalescedBytes, 128 << 20);
bool HiveConfig::isOrcUseColumnNames(const Config* session) const {
if (session->isValueExists(kOrcUseColumnNamesSession)) {
return session->get<bool>(kOrcUseColumnNamesSession).value();
}
return config_->get<bool>(kOrcUseColumnNames, false);
}

// static.
int32_t HiveConfig::maxCoalescedDistanceBytes(const Config* config) {
return config->get<int32_t>(kMaxCoalescedDistanceBytes, 512 << 10);
bool HiveConfig::isFileColumnNamesReadAsLowerCase(const Config* session) const {
if (session->isValueExists(kFileColumnNamesReadAsLowerCaseSession)) {
return session->get<bool>(kFileColumnNamesReadAsLowerCaseSession).value();
}
return config_->get<bool>(kFileColumnNamesReadAsLowerCase, false);
}

// static.
int32_t HiveConfig::numCacheFileHandles(const Config* config) {
return config->get<int32_t>(kNumCacheFileHandles, 20'000);
int64_t HiveConfig::maxCoalescedBytes() const {
return config_->get<int64_t>(kMaxCoalescedBytes, 128 << 20);
}

// static.
bool HiveConfig::isFileHandleCacheEnabled(const Config* config) {
return config->get<bool>(kEnableFileHandleCache, true);
int32_t HiveConfig::maxCoalescedDistanceBytes() const {
return config_->get<int32_t>(kMaxCoalescedDistanceBytes, 512 << 10);
}

// static.
uint32_t HiveConfig::sortWriterMaxOutputRows(const Config* config) {
return config->get<int32_t>(kSortWriterMaxOutputRows, 1024);
int32_t HiveConfig::numCacheFileHandles() const {
return config_->get<int32_t>(kNumCacheFileHandles, 20'000);
}

// static.
uint64_t HiveConfig::sortWriterMaxOutputBytes(const Config* config) {
return config->get<uint64_t>(kSortWriterMaxOutputBytes, 10UL << 20);
bool HiveConfig::isFileHandleCacheEnabled() const {
return config_->get<bool>(kEnableFileHandleCache, true);
}

uint64_t HiveConfig::getOrcWriterMaxStripeSize(
const Config* connectorQueryCtxConfig,
const Config* connectorPropertiesConfig) {
if (connectorQueryCtxConfig != nullptr &&
connectorQueryCtxConfig->isValueExists(kOrcWriterMaxStripeSize)) {
uint64_t HiveConfig::getOrcWriterMaxStripeSize(const Config* session) const {
if (session->isValueExists(kOrcWriterMaxStripeSizeSession)) {
return toCapacity(
connectorQueryCtxConfig->get<std::string>(kOrcWriterMaxStripeSize)
.value(),
session->get<std::string>(kOrcWriterMaxStripeSizeSession).value(),
core::CapacityUnit::BYTE);
}
if (connectorPropertiesConfig != nullptr &&
connectorPropertiesConfig->isValueExists(kOrcWriterMaxStripeSizeConfig)) {
if (config_->isValueExists(kOrcWriterMaxStripeSize)) {
return toCapacity(
connectorPropertiesConfig
->get<std::string>(kOrcWriterMaxStripeSizeConfig)
.value(),
config_->get<std::string>(kOrcWriterMaxStripeSize).value(),
core::CapacityUnit::BYTE);
}
return 64L * 1024L * 1024L;
}

uint64_t HiveConfig::getOrcWriterMaxDictionaryMemory(
const Config* connectorQueryCtxConfig,
const Config* connectorPropertiesConfig) {
if (connectorQueryCtxConfig != nullptr &&
connectorQueryCtxConfig->isValueExists(kOrcWriterMaxDictionaryMemory)) {
const Config* session) const {
if (session->isValueExists(kOrcWriterMaxDictionaryMemorySession)) {
return toCapacity(
connectorQueryCtxConfig->get<std::string>(kOrcWriterMaxDictionaryMemory)
.value(),
session->get<std::string>(kOrcWriterMaxDictionaryMemorySession).value(),
core::CapacityUnit::BYTE);
}
if (connectorPropertiesConfig != nullptr &&
connectorPropertiesConfig->isValueExists(
kOrcWriterMaxDictionaryMemoryConfig)) {
if (config_->isValueExists(kOrcWriterMaxDictionaryMemory)) {
return toCapacity(
connectorPropertiesConfig
->get<std::string>(kOrcWriterMaxDictionaryMemoryConfig)
.value(),
config_->get<std::string>(kOrcWriterMaxDictionaryMemory).value(),
core::CapacityUnit::BYTE);
}
return 16L * 1024L * 1024L;
}

uint32_t HiveConfig::sortWriterMaxOutputRows(const Config* session) const {
if (session->isValueExists(kSortWriterMaxOutputRowsSession)) {
return session->get<uint32_t>(kSortWriterMaxOutputRowsSession).value();
}
return config_->get<int32_t>(kSortWriterMaxOutputRows, 1024);
}

uint64_t HiveConfig::sortWriterMaxOutputBytes(const Config* session) const {
if (session->isValueExists(kSortWriterMaxOutputBytesSession)) {
return toCapacity(
session->get<std::string>(kSortWriterMaxOutputBytesSession).value(),
core::CapacityUnit::BYTE);
}
if (config_->isValueExists(kSortWriterMaxOutputBytes)) {
return toCapacity(
config_->get<std::string>(kSortWriterMaxOutputBytes).value(),
core::CapacityUnit::BYTE);
}
return 10UL << 20;
}

} // namespace facebook::velox::connector::hive
Loading

0 comments on commit eb75367

Please sign in to comment.