diff --git a/velox/connectors/hive/HiveConfig.cpp b/velox/connectors/hive/HiveConfig.cpp index 95d46e462a7e..c2d4a27d6e5d 100644 --- a/velox/connectors/hive/HiveConfig.cpp +++ b/velox/connectors/hive/HiveConfig.cpp @@ -153,6 +153,11 @@ bool HiveConfig::isFileColumnNamesReadAsLowerCase(const Config* session) const { return config_->get(kFileColumnNamesReadAsLowerCase, false); } +bool HiveConfig::isPartitionPathAsLowerCaseSession( + const Config* session) const { + return config_->get(kPartitionPathAsLowerCaseSession, true); +} + int64_t HiveConfig::maxCoalescedBytes() const { return config_->get(kMaxCoalescedBytes, 128 << 20); } diff --git a/velox/connectors/hive/HiveConfig.h b/velox/connectors/hive/HiveConfig.h index 335d7e7e6814..b2329f251c86 100644 --- a/velox/connectors/hive/HiveConfig.h +++ b/velox/connectors/hive/HiveConfig.h @@ -110,6 +110,9 @@ class HiveConfig { static constexpr const char* kFileColumnNamesReadAsLowerCaseSession = "file_column_names_read_as_lower_case"; + static constexpr const char* kPartitionPathAsLowerCaseSession = + "partition_path_as_lower_case"; + /// Sets the max coalesce bytes for a request. static constexpr const char* kMaxCoalescedBytes = "max-coalesced-bytes"; @@ -202,6 +205,8 @@ class HiveConfig { bool isFileColumnNamesReadAsLowerCase(const Config* session) const; + bool isPartitionPathAsLowerCaseSession(const Config* session) const; + int64_t maxCoalescedBytes() const; int32_t maxCoalescedDistanceBytes() const; diff --git a/velox/connectors/hive/HiveDataSink.cpp b/velox/connectors/hive/HiveDataSink.cpp index 367b52f83a1d..ce5f3ced1ef3 100644 --- a/velox/connectors/hive/HiveDataSink.cpp +++ b/velox/connectors/hive/HiveDataSink.cpp @@ -38,6 +38,28 @@ namespace facebook::velox::connector::hive { namespace { +RowVectorPtr makeDataInput( + const std::vector& partitonCols, + const RowVectorPtr& input, + const RowTypePtr& dataType) { + std::vector childVectors; + childVectors.reserve(dataType->size()); + for (uint32_t i = 0; i < input->childrenSize(); i++) { + if (std::find(partitonCols.cbegin(), partitonCols.cend(), i) == + partitonCols.cend()) { + childVectors.push_back(input->childAt(i)); + } + } + + return std::make_shared( + input->pool(), + dataType, + input->nulls(), + input->size(), + std::move(childVectors), + input->getNullCount()); +} + // Returns a subset of column indices corresponding to partition keys. std::vector getPartitionChannels( const std::shared_ptr& insertTableHandle) { @@ -301,12 +323,15 @@ HiveDataSink::HiveDataSink( connectorQueryCtx->sessionProperties())), partitionChannels_(getPartitionChannels(insertTableHandle_)), partitionIdGenerator_( - !partitionChannels_.empty() ? std::make_unique( - inputType_, - partitionChannels_, - maxOpenWriters_, - connectorQueryCtx_->memoryPool()) - : nullptr), + !partitionChannels_.empty() + ? std::make_unique( + inputType_, + partitionChannels_, + maxOpenWriters_, + connectorQueryCtx_->memoryPool(), + hiveConfig_->isPartitionPathAsLowerCaseSession( + connectorQueryCtx->sessionProperties())) + : nullptr), bucketCount_( insertTableHandle_->bucketProperty() == nullptr ? 0 @@ -331,6 +356,18 @@ HiveDataSink::HiveDataSink( "Unsupported commit strategy: {}", commitStrategyToString(commitStrategy_)); + // Get the data input type based on the inputType and the parition index. + std::vector childTypes; + std::vector childNames; + for (auto i = 0; i < inputType_->size(); i++) { + if (std::find(partitionChannels_.cbegin(), partitionChannels_.cend(), i) == + partitionChannels_.end()) { + childNames.push_back(inputType_->nameOf(i)); + childTypes.push_back(inputType_->childAt(i)); + } + } + dataType_ = ROW(std::move(childNames), std::move(childTypes)); + if (!isBucketed()) { return; } @@ -400,8 +437,17 @@ void HiveDataSink::appendData(RowVectorPtr input) { void HiveDataSink::write(size_t index, const VectorPtr& input) { WRITER_NON_RECLAIMABLE_SECTION_GUARD(index); - writers_[index]->write(input); - writerInfo_[index]->numWrittenRows += input->size(); + // Skip the partition columns before writing. + auto dataInput = input; + if (!isBucketed()) { + dataInput = makeDataInput( + partitionChannels_, + std::dynamic_pointer_cast(input), + dataType_); + } + + writers_[index]->write(dataInput); + writerInfo_[index]->numWrittenRows += dataInput->size(); } std::string HiveDataSink::stateString(State state) { @@ -597,7 +643,12 @@ uint32_t HiveDataSink::appendWriter(const HiveWriterId& id) { dwio::common::WriterOptions options; const auto* connectorSessionProperties = connectorQueryCtx_->sessionProperties(); - options.schema = inputType_; + if (!isBucketed()) { + options.schema = dataType_; + } else { + options.schema = inputType_; + } + options.memoryPool = writerInfo_.back()->writerPool.get(); options.compressionKind = insertTableHandle_->compressionKind(); if (canReclaim()) { @@ -737,6 +788,12 @@ std::pair HiveDataSink::getWriterFileNames( const std::string writeFileName = isCommitRequired() ? fmt::format(".tmp.velox.{}_{}", targetFileName, makeUuid()) : targetFileName; + if (insertTableHandle_->tableStorageFormat() == + dwio::common::FileFormat::PARQUET) { + return { + fmt::format("{}{}", targetFileName, ".parquet"), + fmt::format("{}{}", writeFileName, ".parquet")}; + } return {targetFileName, writeFileName}; } diff --git a/velox/connectors/hive/HiveDataSink.h b/velox/connectors/hive/HiveDataSink.h index a4ea8f2d7dc3..ac63720ccd25 100644 --- a/velox/connectors/hive/HiveDataSink.h +++ b/velox/connectors/hive/HiveDataSink.h @@ -541,6 +541,8 @@ class HiveDataSink : public DataSink { void closeInternal(); const RowTypePtr inputType_; + // Written data columns into file. + RowTypePtr dataType_; const std::shared_ptr insertTableHandle_; const ConnectorQueryCtx* const connectorQueryCtx_; const CommitStrategy commitStrategy_; diff --git a/velox/connectors/hive/PartitionIdGenerator.cpp b/velox/connectors/hive/PartitionIdGenerator.cpp index e795a08962b1..7d4734cd1996 100644 --- a/velox/connectors/hive/PartitionIdGenerator.cpp +++ b/velox/connectors/hive/PartitionIdGenerator.cpp @@ -27,9 +27,11 @@ PartitionIdGenerator::PartitionIdGenerator( const RowTypePtr& inputType, std::vector partitionChannels, uint32_t maxPartitions, - memory::MemoryPool* pool) + memory::MemoryPool* pool, + bool partitionPathAsLowerCase) : partitionChannels_(std::move(partitionChannels)), - maxPartitions_(maxPartitions) { + maxPartitions_(maxPartitions), + partitionPathAsLowerCase_(partitionPathAsLowerCase) { VELOX_USER_CHECK( !partitionChannels_.empty(), "There must be at least one partition key."); for (auto channel : partitionChannels_) { @@ -98,7 +100,8 @@ void PartitionIdGenerator::run( std::string PartitionIdGenerator::partitionName(uint64_t partitionId) const { return FileUtils::makePartName( - extractPartitionKeyValues(partitionValues_, partitionId)); + extractPartitionKeyValues(partitionValues_, partitionId), + partitionPathAsLowerCase_); } void PartitionIdGenerator::computeValueIds( diff --git a/velox/connectors/hive/PartitionIdGenerator.h b/velox/connectors/hive/PartitionIdGenerator.h index 1a4844398b0e..4f3c6c214e79 100644 --- a/velox/connectors/hive/PartitionIdGenerator.h +++ b/velox/connectors/hive/PartitionIdGenerator.h @@ -34,7 +34,8 @@ class PartitionIdGenerator { const RowTypePtr& inputType, std::vector partitionChannels, uint32_t maxPartitions, - memory::MemoryPool* pool); + memory::MemoryPool* pool, + bool partitionPathAsLowerCase = true); /// Generate sequential partition IDs for input vector. /// @param input Input RowVector. @@ -77,6 +78,8 @@ class PartitionIdGenerator { const uint32_t maxPartitions_; + bool partitionPathAsLowerCase_; + std::vector> hashers_; // A mapping from value ID produced by VectorHashers to a partition ID. diff --git a/velox/dwio/catalog/fbhive/FileUtils.cpp b/velox/dwio/catalog/fbhive/FileUtils.cpp index 589a5060e766..4de7f7d092e5 100644 --- a/velox/dwio/catalog/fbhive/FileUtils.cpp +++ b/velox/dwio/catalog/fbhive/FileUtils.cpp @@ -158,14 +158,20 @@ std::string FileUtils::unescapePathName(const std::string& data) { } std::string FileUtils::makePartName( - const std::vector>& entries) { + const std::vector>& entries, + bool partitionPathAsLowerCase) { size_t size = 0; size_t escapeCount = 0; std::for_each(entries.begin(), entries.end(), [&](auto& pair) { auto keySize = pair.first.size(); DWIO_ENSURE_GT(keySize, 0); size += keySize; - escapeCount += countEscape(pair.first); + if (partitionPathAsLowerCase) { + escapeCount += countEscape(toLower(pair.first)); + } else { + escapeCount += countEscape(pair.first); + } + auto valSize = pair.second.size(); if (valSize == 0) { size += DEFAULT_PARTITION_VALUE.size(); @@ -182,7 +188,7 @@ std::string FileUtils::makePartName( if (ret.size() > 0) { ret += "/"; } - ret += escapePathName(toLower(pair.first)); + ret += escapePathName(pair.first); ret += "="; if (pair.second.size() == 0) { ret += DEFAULT_PARTITION_VALUE; diff --git a/velox/dwio/catalog/fbhive/FileUtils.h b/velox/dwio/catalog/fbhive/FileUtils.h index 1c759ffb19e0..540798588486 100644 --- a/velox/dwio/catalog/fbhive/FileUtils.h +++ b/velox/dwio/catalog/fbhive/FileUtils.h @@ -40,7 +40,8 @@ class FileUtils { /// Creates the partition directory path from the list of partition key/value /// pairs, will do url-encoding when needed. static std::string makePartName( - const std::vector>& entries); + const std::vector>& entries, + bool partitionPathAsLowerCase = true); /// Converts the hive-metastore-compliant path name back to the corresponding /// partition key/value pairs. diff --git a/velox/dwio/catalog/fbhive/test/FileUtilsTests.cpp b/velox/dwio/catalog/fbhive/test/FileUtilsTests.cpp index 4abe1c4905e0..2a624b9bcd70 100644 --- a/velox/dwio/catalog/fbhive/test/FileUtilsTests.cpp +++ b/velox/dwio/catalog/fbhive/test/FileUtilsTests.cpp @@ -28,7 +28,7 @@ TEST(FileUtilsTests, MakePartName) { {"ds", "2016-01-01"}, {"FOO", ""}, {"a\nb:c", "a#b=c"}}; ASSERT_EQ( FileUtils::makePartName(pairs), - "ds=2016-01-01/foo=__HIVE_DEFAULT_PARTITION__/a%0Ab%3Ac=a%23b%3Dc"); + "ds=2016-01-01/FOO=__HIVE_DEFAULT_PARTITION__/a%0Ab%3Ac=a%23b%3Dc"); } TEST(FileUtilsTests, ParsePartKeyValues) { diff --git a/velox/exec/tests/TableWriteTest.cpp b/velox/exec/tests/TableWriteTest.cpp index 27ca50f499e9..871afbb82920 100644 --- a/velox/exec/tests/TableWriteTest.cpp +++ b/velox/exec/tests/TableWriteTest.cpp @@ -108,6 +108,23 @@ std::function addTableWriter( }; } +RowTypePtr getScanOutput( + const std::vector& partitionedKeys, + const RowTypePtr& rowType) { + std::vector dataColumnNames; + std::vector dataColumnTypes; + for (auto i = 0; i < rowType->size(); i++) { + auto name = rowType->names()[i]; + if (std::find(partitionedKeys.cbegin(), partitionedKeys.cend(), name) == + partitionedKeys.cend()) { + dataColumnNames.emplace_back(name); + dataColumnTypes.emplace_back(rowType->findChild(name)); + } + } + + return ROW(std::move(dataColumnNames), std::move(dataColumnTypes)); +} + FOLLY_ALWAYS_INLINE std::ostream& operator<<(std::ostream& os, TestMode mode) { os << testModeString(mode); return os; @@ -766,14 +783,28 @@ class TableWriteTest : public HiveConnectorTestBase { const std::string& targetDir) { verifyPartitionedDirPath(filePath, targetDir); if (commitStrategy_ == CommitStrategy::kNoCommit) { - ASSERT_TRUE(RE2::FullMatch( - filePath.filename().string(), "0[0-9]+_0_TaskCursorQuery_[0-9]+")) - << filePath.filename().string(); + if (fileFormat_ == FileFormat::PARQUET) { + ASSERT_TRUE(RE2::FullMatch( + filePath.filename().string(), + "0[0-9]+_0_TaskCursorQuery_[0-9]+\\.parquet$")) + << filePath.filename().string(); + } else { + ASSERT_TRUE(RE2::FullMatch( + filePath.filename().string(), "0[0-9]+_0_TaskCursorQuery_[0-9]+")) + << filePath.filename().string(); + } } else { - ASSERT_TRUE(RE2::FullMatch( - filePath.filename().string(), - ".tmp.velox.0[0-9]+_0_TaskCursorQuery_[0-9]+_.+")) - << filePath.filename().string(); + if (fileFormat_ == FileFormat::PARQUET) { + ASSERT_TRUE(RE2::FullMatch( + filePath.filename().string(), + ".tmp.velox.0[0-9]+_0_TaskCursorQuery_[0-9]+_.+\\.parquet$")) + << filePath.filename().string(); + } else { + ASSERT_TRUE(RE2::FullMatch( + filePath.filename().string(), + ".tmp.velox.0[0-9]+_0_TaskCursorQuery_[0-9]+_.+")) + << filePath.filename().string(); + } } } @@ -823,12 +854,22 @@ class TableWriteTest : public HiveConnectorTestBase { void verifyPartitionedFilesData( const std::vector& filePaths, const std::filesystem::path& dirPath) { - HiveConnectorTestBase::assertQuery( - PlanBuilder().tableScan(rowType_).planNode(), - {makeHiveConnectorSplits(filePaths)}, - fmt::format( - "SELECT * FROM tmp WHERE {}", - partitionNameToPredicate(getPartitionDirNames(dirPath)))); + if (bucketProperty_ != nullptr) { + HiveConnectorTestBase::assertQuery( + PlanBuilder().tableScan(rowType_).planNode(), + {makeHiveConnectorSplits(filePaths)}, + fmt::format( + "SELECT * FROM tmp WHERE {}", + partitionNameToPredicate(getPartitionDirNames(dirPath)))); + + } else { + HiveConnectorTestBase::assertQuery( + PlanBuilder().tableScan(rowType_).planNode(), + {makeHiveConnectorSplits(filePaths)}, + fmt::format( + "SELECT c2, c3, c4, c5 FROM tmp WHERE {}", + partitionNameToPredicate(getPartitionDirNames(dirPath)))); + } } // Gets the hash function used by the production code to build bucket id. @@ -1479,12 +1520,20 @@ TEST_P(AllTableWriterTest, scanFilterProjectWrite) { // To test the correctness of the generated output, // We create a new plan that only read that file and then // compare that against a duckDB query that runs the whole query. - assertQuery( - PlanBuilder().tableScan(outputType).planNode(), - makeHiveConnectorSplits(outputDirectory), - "SELECT c0, c1, c3, c5, c2 + c3, substr(c5, 1, 1) FROM tmp WHERE c2 <> 0"); - - verifyTableWriterOutput(outputDirectory->path, outputType, false); + if (partitionedBy_.size() > 0 && bucketProperty_ == nullptr) { + auto newOutputType = getScanOutput(partitionedBy_, outputType); + assertQuery( + PlanBuilder().tableScan(newOutputType).planNode(), + makeHiveConnectorSplits(outputDirectory), + "SELECT c3, c5, c2 + c3, substr(c5, 1, 1) FROM tmp WHERE c2 <> 0"); + verifyTableWriterOutput(outputDirectory->path, newOutputType, false); + } else { + assertQuery( + PlanBuilder().tableScan(outputType).planNode(), + makeHiveConnectorSplits(outputDirectory), + "SELECT c0, c1, c3, c5, c2 + c3, substr(c5, 1, 1) FROM tmp WHERE c2 <> 0"); + verifyTableWriterOutput(outputDirectory->path, outputType, false); + } } TEST_P(AllTableWriterTest, renameAndReorderColumns) { @@ -1533,12 +1582,22 @@ TEST_P(AllTableWriterTest, renameAndReorderColumns) { assertQueryWithWriterConfigs(plan, filePaths, "SELECT count(*) FROM tmp"); - HiveConnectorTestBase::assertQuery( - PlanBuilder().tableScan(tableSchema_).planNode(), - makeHiveConnectorSplits(outputDirectory), - "SELECT c2, c5, c4, c1, c0, c3 FROM tmp"); + if (partitionedBy_.size() > 0 && bucketProperty_ == nullptr) { + auto newOutputType = getScanOutput(partitionedBy_, tableSchema_); + HiveConnectorTestBase::assertQuery( + PlanBuilder().tableScan(newOutputType).planNode(), + makeHiveConnectorSplits(outputDirectory), + "SELECT c2, c5, c4, c3 FROM tmp"); + + verifyTableWriterOutput(outputDirectory->path, newOutputType, false); + } else { + HiveConnectorTestBase::assertQuery( + PlanBuilder().tableScan(tableSchema_).planNode(), + makeHiveConnectorSplits(outputDirectory), + "SELECT c2, c5, c4, c1, c0, c3 FROM tmp"); - verifyTableWriterOutput(outputDirectory->path, tableSchema_, false); + verifyTableWriterOutput(outputDirectory->path, tableSchema_, false); + } } // Runs a pipeline with read + write. @@ -1569,12 +1628,22 @@ TEST_P(AllTableWriterTest, directReadWrite) { // We create a new plan that only read that file and then // compare that against a duckDB query that runs the whole query. - assertQuery( - PlanBuilder().tableScan(rowType_).planNode(), - makeHiveConnectorSplits(outputDirectory), - "SELECT * FROM tmp"); + if (partitionedBy_.size() > 0 && bucketProperty_ == nullptr) { + auto newOutputType = getScanOutput(partitionedBy_, tableSchema_); + assertQuery( + PlanBuilder().tableScan(newOutputType).planNode(), + makeHiveConnectorSplits(outputDirectory), + "SELECT c2, c3, c4, c5 FROM tmp"); + rowType_ = newOutputType; + verifyTableWriterOutput(outputDirectory->path, rowType_); + } else { + assertQuery( + PlanBuilder().tableScan(rowType_).planNode(), + makeHiveConnectorSplits(outputDirectory), + "SELECT * FROM tmp"); - verifyTableWriterOutput(outputDirectory->path, rowType_); + verifyTableWriterOutput(outputDirectory->path, rowType_); + } } // Tests writing constant vectors. @@ -1600,12 +1669,22 @@ TEST_P(AllTableWriterTest, constantVectors) { assertQuery(op, fmt::format("SELECT {}", size)); - assertQuery( - PlanBuilder().tableScan(rowType_).planNode(), - makeHiveConnectorSplits(outputDirectory), - "SELECT * FROM tmp"); + if (partitionedBy_.size() > 0 && bucketProperty_ == nullptr) { + auto newOutputType = getScanOutput(partitionedBy_, tableSchema_); + assertQuery( + PlanBuilder().tableScan(newOutputType).planNode(), + makeHiveConnectorSplits(outputDirectory), + "SELECT c2, c3, c4, c5 FROM tmp"); + rowType_ = newOutputType; + verifyTableWriterOutput(outputDirectory->path, rowType_); + } else { + assertQuery( + PlanBuilder().tableScan(rowType_).planNode(), + makeHiveConnectorSplits(outputDirectory), + "SELECT * FROM tmp"); - verifyTableWriterOutput(outputDirectory->path, rowType_); + verifyTableWriterOutput(outputDirectory->path, rowType_); + } } TEST_P(AllTableWriterTest, emptyInput) { @@ -1649,11 +1728,23 @@ TEST_P(AllTableWriterTest, commitStrategies) { assertQuery(plan, "SELECT count(*) FROM tmp"); - assertQuery( - PlanBuilder().tableScan(rowType_).planNode(), - makeHiveConnectorSplits(outputDirectory), - "SELECT * FROM tmp"); - verifyTableWriterOutput(outputDirectory->path, rowType_); + if (partitionedBy_.size() > 0 && bucketProperty_ == nullptr) { + auto newOutputType = getScanOutput(partitionedBy_, tableSchema_); + assertQuery( + PlanBuilder().tableScan(newOutputType).planNode(), + makeHiveConnectorSplits(outputDirectory), + "SELECT c2, c3, c4, c5 FROM tmp"); + auto originalRowType = rowType_; + rowType_ = newOutputType; + verifyTableWriterOutput(outputDirectory->path, rowType_); + rowType_ = originalRowType; + } else { + assertQuery( + PlanBuilder().tableScan(rowType_).planNode(), + makeHiveConnectorSplits(outputDirectory), + "SELECT * FROM tmp"); + verifyTableWriterOutput(outputDirectory->path, rowType_); + } } // Test kNoCommit commit strategy writing to non-temporary files. { @@ -1673,11 +1764,21 @@ TEST_P(AllTableWriterTest, commitStrategies) { assertQuery(plan, "SELECT count(*) FROM tmp"); - assertQuery( - PlanBuilder().tableScan(rowType_).planNode(), - makeHiveConnectorSplits(outputDirectory), - "SELECT * FROM tmp"); - verifyTableWriterOutput(outputDirectory->path, rowType_); + if (partitionedBy_.size() > 0 && bucketProperty_ == nullptr) { + auto newOutputType = getScanOutput(partitionedBy_, tableSchema_); + assertQuery( + PlanBuilder().tableScan(newOutputType).planNode(), + makeHiveConnectorSplits(outputDirectory), + "SELECT c2, c3, c4, c5 FROM tmp"); + rowType_ = newOutputType; + verifyTableWriterOutput(outputDirectory->path, rowType_); + } else { + assertQuery( + PlanBuilder().tableScan(rowType_).planNode(), + makeHiveConnectorSplits(outputDirectory), + "SELECT * FROM tmp"); + verifyTableWriterOutput(outputDirectory->path, rowType_); + } } } @@ -1859,12 +1960,13 @@ TEST_P(PartitionedTableWriterTest, multiplePartitions) { // Verify distribution of records in partition directories. auto iterPartitionDirectory = actualPartitionDirectories.begin(); auto iterPartitionName = partitionNames.begin(); + auto newOutputType = getScanOutput(partitionKeys, rowType); while (iterPartitionDirectory != actualPartitionDirectories.end()) { assertQuery( - PlanBuilder().tableScan(rowType).planNode(), + PlanBuilder().tableScan(newOutputType).planNode(), makeHiveConnectorSplits(*iterPartitionDirectory), fmt::format( - "SELECT * FROM tmp WHERE {}", + "SELECT c0, c1, c3, c5 FROM tmp WHERE {}", partitionNameToPredicate(*iterPartitionName, partitionTypes))); // In case of unbucketed partitioned table, one single file is written to // each partition directory for Hive connector. @@ -1933,10 +2035,11 @@ TEST_P(PartitionedTableWriterTest, singlePartition) { fs::path(outputDirectory->path) / "p0=365"); // Verify all data is written to the single partition directory. + auto newOutputType = getScanOutput(partitionKeys, rowType); assertQuery( - PlanBuilder().tableScan(rowType).planNode(), + PlanBuilder().tableScan(newOutputType).planNode(), makeHiveConnectorSplits(outputDirectory), - "SELECT * FROM tmp"); + "SELECT c0, c3, c5 FROM tmp"); // In case of unbucketed partitioned table, one single file is written to // each partition directory for Hive connector. @@ -1979,10 +2082,11 @@ TEST_P(PartitionedWithoutBucketTableWriterTest, fromSinglePartitionToMultiple) { assertQueryWithWriterConfigs(plan, "SELECT count(*) FROM tmp"); + auto newOutputType = getScanOutput(partitionKeys, rowType); assertQuery( - PlanBuilder().tableScan(rowType).planNode(), + PlanBuilder().tableScan(newOutputType).planNode(), makeHiveConnectorSplits(outputDirectory), - "SELECT * FROM tmp"); + "SELECT c1 FROM tmp"); } TEST_P(PartitionedTableWriterTest, maxPartitions) { @@ -2449,7 +2553,15 @@ TEST_P(AllTableWriterTest, tableWriteOutputCheck) { if (commitStrategy_ == CommitStrategy::kNoCommit) { ASSERT_EQ(writeFileName, targetFileName); } else { - ASSERT_TRUE(writeFileName.find(targetFileName) != std::string::npos); + const std::string kParquetSuffix = ".parquet"; + if (folly::StringPiece(targetFileName).endsWith(kParquetSuffix)) { + // Remove the .parquet suffix. + auto trimmedFilename = targetFileName.substr( + 0, targetFileName.size() - kParquetSuffix.size()); + ASSERT_TRUE(writeFileName.find(trimmedFilename) != std::string::npos); + } else { + ASSERT_TRUE(writeFileName.find(targetFileName) != std::string::npos); + } } } if (!commitContextVector->isNullAt(i)) {