From dc7afc6edf374bf3f66b43452f56b192f21a196a Mon Sep 17 00:00:00 2001 From: ffacs Date: Sat, 13 Jul 2024 00:27:23 +0800 Subject: [PATCH] ORC-1390: [C++] Support schema evolution from string group to decimal/timestamp ### What changes were proposed in this pull request? 1. Support schema evolution from `{stirng, char, varchar}` to `{decimal,timestamp,timestamp_instant}` 2. Fix a bug that cannot convert from `varchar` to `varchar` and `char` to `char` ### Why are the changes needed? To support Schema evolution at c++ side. ### How was this patch tested? UT passed ### Was this patch authored or co-authored using generative AI tooling? NO Closes #1949 from ffacs/ORC-1390. Authored-by: ffacs Signed-off-by: Gang Wu --- c++/src/ConvertColumnReader.cc | 304 ++++++++++++++++++++++++++-- c++/src/SchemaEvolution.cc | 3 +- c++/test/TestConvertColumnReader.cc | 246 ++++++++++++++++++++++ c++/test/TestSchemaEvolution.cc | 34 +++- 4 files changed, 558 insertions(+), 29 deletions(-) diff --git a/c++/src/ConvertColumnReader.cc b/c++/src/ConvertColumnReader.cc index a24b8cb053..a9003bc163 100644 --- a/c++/src/ConvertColumnReader.cc +++ b/c++/src/ConvertColumnReader.cc @@ -19,6 +19,8 @@ #include "ConvertColumnReader.hh" #include "Utils.hh" +#include + namespace orc { // Assume that we are using tight numeric vector batch @@ -73,6 +75,23 @@ namespace orc { } } + static inline void handleParseFromStringError(ColumnVectorBatch& dstBatch, uint64_t idx, + bool shouldThrow, const std::string& typeName, + const std::string& str, + const std::string& expectedFormat = "") { + if (!shouldThrow) { + dstBatch.notNull.data()[idx] = 0; + dstBatch.hasNulls = true; + } else { + std::ostringstream ss; + ss << "Failed to parse " << typeName << " from string:" << str; + if (expectedFormat != "") { + ss << " the following format \"" << expectedFormat << "\" is expected"; + } + throw SchemaEvolutionError(ss.str()); + } + } + // return false if overflow template static bool downCastToInteger(ReadType& dstValue, int64_t inputLong) { @@ -400,13 +419,14 @@ namespace orc { ConvertToTimestampColumnReader(const Type& readType, const Type& fileType, StripeStreams& stripe, bool throwOnOverflow) : ConvertColumnReader(readType, fileType, stripe, throwOnOverflow), - readerTimezone(readType.getKind() == TIMESTAMP_INSTANT ? &getTimezoneByName("GMT") - : &stripe.getReaderTimezone()), + isInstant(readType.getKind() == TIMESTAMP_INSTANT), + readerTimezone(isInstant ? &getTimezoneByName("GMT") : &stripe.getReaderTimezone()), needConvertTimezone(readerTimezone != &getTimezoneByName("GMT")) {} void next(ColumnVectorBatch& rowBatch, uint64_t numValues, char* notNull) override; protected: + const bool isInstant; const orc::Timezone* readerTimezone; const bool needConvertTimezone; }; @@ -722,10 +742,11 @@ namespace orc { void convertToInteger(ReadTypeBatch& dstBatch, const StringVectorBatch& srcBatch, uint64_t idx) { int64_t longValue = 0; + const std::string longStr(srcBatch.data[idx], srcBatch.length[idx]); try { - longValue = std::stoll(std::string(srcBatch.data[idx], srcBatch.length[idx])); + longValue = std::stoll(longStr); } catch (...) { - handleOverflow(dstBatch, idx, throwOnOverflow); + handleParseFromStringError(dstBatch, idx, throwOnOverflow, "Long", longStr); return; } if constexpr (std::is_same_v) { @@ -738,14 +759,16 @@ namespace orc { } void convertToDouble(ReadTypeBatch& dstBatch, const StringVectorBatch& srcBatch, uint64_t idx) { + const std::string floatValue(srcBatch.data[idx], srcBatch.length[idx]); try { if constexpr (std::is_same_v) { - dstBatch.data[idx] = std::stof(std::string(srcBatch.data[idx], srcBatch.length[idx])); + dstBatch.data[idx] = std::stof(floatValue); } else { - dstBatch.data[idx] = std::stod(std::string(srcBatch.data[idx], srcBatch.length[idx])); + dstBatch.data[idx] = std::stod(floatValue); } } catch (...) { - handleOverflow(dstBatch, idx, throwOnOverflow); + handleParseFromStringError(dstBatch, idx, throwOnOverflow, typeid(readType).name(), + floatValue); } } }; @@ -801,6 +824,209 @@ namespace orc { } }; + class StringVariantToTimestampColumnReader : public ConvertToTimestampColumnReader { + public: + StringVariantToTimestampColumnReader(const Type& readType, const Type& fileType, + StripeStreams& stripe, bool throwOnOverflow) + : ConvertToTimestampColumnReader(readType, fileType, stripe, throwOnOverflow) {} + + void next(ColumnVectorBatch& rowBatch, uint64_t numValues, char* notNull) override { + ConvertToTimestampColumnReader::next(rowBatch, numValues, notNull); + + const auto& srcBatch = *SafeCastBatchTo(data.get()); + auto& dstBatch = *SafeCastBatchTo(&rowBatch); + + for (uint64_t i = 0; i < numValues; ++i) { + if (!rowBatch.hasNulls || rowBatch.notNull[i]) { + convertToTimestamp(dstBatch, i, std::string(srcBatch.data[i], srcBatch.length[i])); + } + } + } + + private: + // Algorithm: http://howardhinnant.github.io/date_algorithms.html + // The algorithm implements a proleptic Gregorian calendar. + int64_t daysFromProlepticGregorianCalendar(int32_t y, int32_t m, int32_t d) { + y -= m <= 2; + int32_t era = y / 400; + int32_t yoe = y - era * 400; // [0, 399] + int32_t doy = (153 * (m + (m > 2 ? -3 : 9)) + 2) / 5 + d - 1; // [0, 365] + int32_t doe = yoe * 365 + yoe / 4 - yoe / 100 + doy; // [0, 146096] + return 1ll * era * 146097 + doe - 719468; + } + + std::optional> tryBestToParseFromString( + const std::string& timeStr) { + int32_t year, month, day, hour, min, sec, nanos = 0; + int32_t matched = std::sscanf(timeStr.c_str(), "%4d-%2d-%2d %2d:%2d:%2d.%d", &year, &month, + &day, &hour, &min, &sec, &nanos); + if (matched != 6 && matched != 7) { + return std::nullopt; + } + if (nanos) { + if (nanos < 0 || nanos >= 1e9) { + return std::nullopt; + } + while (nanos < static_cast(1e8)) { + nanos *= 10; + } + } + int64_t daysSinceEpoch = daysFromProlepticGregorianCalendar(year, month, day); + int64_t secondSinceEpoch = 60ll * (60 * (24L * daysSinceEpoch + hour) + min) + sec; + return std::make_optional(std::pair{secondSinceEpoch, nanos}); + } + + void convertToTimestamp(TimestampVectorBatch& dstBatch, uint64_t idx, + const std::string& timeStr) { + // Expected timestamp_instant format string : yyyy-mm-dd hh:mm:ss[.xxx] timezone + // Eg. "2019-07-09 13:11:00 America/Los_Angeles" + // Expected timestamp format string : yyyy-mm-dd hh:mm:ss[.xxx] + // Eg. "2019-07-09 13:11:00" + static std::string expectedTimestampInstantFormat = "yyyy-mm-dd hh:mm:ss[.xxx] timezone"; + static std::string expectedTimestampFormat = "yyyy-mm-dd hh:mm:ss[.xxx]"; + auto timestamp = tryBestToParseFromString(timeStr); + if (!timestamp.has_value()) { + if (!isInstant) { + handleParseFromStringError(dstBatch, idx, throwOnOverflow, "Timestamp", timeStr, + expectedTimestampFormat); + return; + } + handleParseFromStringError(dstBatch, idx, throwOnOverflow, "Timestamp_Instant", timeStr, + expectedTimestampInstantFormat); + return; + } + + auto& [second, nanos] = timestamp.value(); + + if (isInstant) { + size_t pos = 0; // get the name of timezone + pos = timeStr.find(' ', pos) + 1; + pos = timeStr.find(' ', pos); + if (pos == std::string::npos) { + handleParseFromStringError(dstBatch, idx, throwOnOverflow, "Timestamp_Instant", timeStr, + expectedTimestampInstantFormat); + return; + } + pos += 1; + size_t subStrLength = timeStr.length() - pos; + try { + second = getTimezoneByName(timeStr.substr(pos, subStrLength)).convertFromUTC(second); + } catch (const TimezoneError&) { + handleParseFromStringError(dstBatch, idx, throwOnOverflow, "Timestamp_Instant", timeStr, + expectedTimestampInstantFormat); + return; + } + } else { + if (needConvertTimezone) { + second = readerTimezone->convertFromUTC(second); + } + } + dstBatch.data[idx] = second; + dstBatch.nanoseconds[idx] = nanos; + } + }; + + template + class StringVariantToDecimalColumnReader : public ConvertColumnReader { + public: + StringVariantToDecimalColumnReader(const Type& readType, const Type& fileType, + StripeStreams& stripe, bool throwOnOverflow) + : ConvertColumnReader(readType, fileType, stripe, throwOnOverflow), + precision_(static_cast(readType.getPrecision())), + scale_(static_cast(readType.getScale())) {} + + void next(ColumnVectorBatch& rowBatch, uint64_t numValues, char* notNull) override { + ConvertColumnReader::next(rowBatch, numValues, notNull); + + const auto& srcBatch = *SafeCastBatchTo(data.get()); + auto& dstBatch = *SafeCastBatchTo(&rowBatch); + for (uint64_t i = 0; i < numValues; ++i) { + if (!rowBatch.hasNulls || rowBatch.notNull[i]) { + convertToDecimal(dstBatch, i, std::string(srcBatch.data[i], srcBatch.length[i])); + } + } + } + + private: + void convertToDecimal(ReadTypeBatch& dstBatch, uint64_t idx, const std::string& decimalStr) { + constexpr int32_t MAX_PRECISION_128 = 38; + int32_t fromPrecision = 0; + int32_t fromScale = 0; + uint32_t start = 0; + bool negative = false; + if (decimalStr.empty()) { + handleParseFromStringError(dstBatch, idx, throwOnOverflow, "Decimal", decimalStr); + return; + } + auto dotPos = decimalStr.find('.'); + if (dotPos == std::string::npos) { + fromScale = 0; + fromPrecision = decimalStr.length(); + dotPos = decimalStr.length(); + } else { + if (dotPos + 1 == decimalStr.length()) { + handleParseFromStringError(dstBatch, idx, throwOnOverflow, "Decimal", decimalStr); + return; + } + fromPrecision = decimalStr.length() - 1; + fromScale = decimalStr.length() - dotPos - 1; + } + if (decimalStr.front() == '-') { + negative = true; + start++; + fromPrecision--; + } + const std::string integerPortion = decimalStr.substr(start, dotPos - start); + if (dotPos == start || fromPrecision > MAX_PRECISION_128 || fromPrecision <= 0 || + !std::all_of(integerPortion.begin(), integerPortion.end(), ::isdigit)) { + handleParseFromStringError(dstBatch, idx, throwOnOverflow, "Decimal", decimalStr); + return; + } + + Int128 i128; + try { + bool overflow = false; + i128 = Int128(integerPortion); + // overflow won't happen + i128 *= scaleUpInt128ByPowerOfTen(Int128(1), fromScale, overflow); + } catch (const std::exception& e) { + handleParseFromStringError(dstBatch, idx, throwOnOverflow, "Decimal", decimalStr); + return; + } + if (dotPos + 1 < decimalStr.length()) { + const std::string fractionPortion = decimalStr.substr(dotPos + 1, fromScale); + if (!std::all_of(fractionPortion.begin(), fractionPortion.end(), ::isdigit)) { + handleOverflow(dstBatch, idx, throwOnOverflow); + return; + } + i128 += Int128(fractionPortion); + } + + auto [overflow, result] = convertDecimal(i128, fromScale, precision_, scale_); + if (overflow) { + handleOverflow(dstBatch, idx, throwOnOverflow); + return; + } + if (negative) { + result.negate(); + } + + if constexpr (std::is_same_v) { + dstBatch.values[idx] = result; + } else { + if (!result.fitsInLong()) { + handleOverflow(dstBatch, idx, + throwOnOverflow); + } else { + dstBatch.values[idx] = result.toLong(); + } + } + } + + const int32_t precision_; + const int32_t scale_; + }; + #define DEFINE_NUMERIC_CONVERT_READER(FROM, TO, TYPE) \ using FROM##To##TO##ColumnReader = \ NumericConvertColumnReader; @@ -843,6 +1069,12 @@ namespace orc { #define DEFINE_STRING_VARIANT_CONVERT_READER(FROM, TO) \ using FROM##To##TO##ColumnReader = StringVariantConvertColumnReader; +#define DEFINE_STRING_VARIANT_CONVERT_TO_TIMESTAMP_READER(FROM, TO) \ + using FROM##To##TO##ColumnReader = StringVariantToTimestampColumnReader; + +#define DEFINE_STRING_VARIANT_CONVERT_CONVERT_TO_DECIMAL_READER(FROM, TO) \ + using FROM##To##TO##ColumnReader = StringVariantToDecimalColumnReader; + DEFINE_NUMERIC_CONVERT_READER(Boolean, Byte, int8_t) DEFINE_NUMERIC_CONVERT_READER(Boolean, Short, int16_t) DEFINE_NUMERIC_CONVERT_READER(Boolean, Int, int32_t) @@ -973,12 +1205,28 @@ namespace orc { DEFINE_STRING_VARIANT_CONVERT_TO_NUMERIC_READER(Varchar, Double, double) // String variant to string variant + DEFINE_STRING_VARIANT_CONVERT_READER(String, String) DEFINE_STRING_VARIANT_CONVERT_READER(String, Char) DEFINE_STRING_VARIANT_CONVERT_READER(String, Varchar) + DEFINE_STRING_VARIANT_CONVERT_READER(Char, Char) DEFINE_STRING_VARIANT_CONVERT_READER(Char, String) DEFINE_STRING_VARIANT_CONVERT_READER(Char, Varchar) DEFINE_STRING_VARIANT_CONVERT_READER(Varchar, String) DEFINE_STRING_VARIANT_CONVERT_READER(Varchar, Char) + DEFINE_STRING_VARIANT_CONVERT_READER(Varchar, Varchar) + + // String variant to timestamp + DEFINE_STRING_VARIANT_CONVERT_TO_TIMESTAMP_READER(String, Timestamp) + DEFINE_STRING_VARIANT_CONVERT_TO_TIMESTAMP_READER(Char, Timestamp) + DEFINE_STRING_VARIANT_CONVERT_TO_TIMESTAMP_READER(Varchar, Timestamp) + + // String variant to decimal + DEFINE_STRING_VARIANT_CONVERT_CONVERT_TO_DECIMAL_READER(String, Decimal64) + DEFINE_STRING_VARIANT_CONVERT_CONVERT_TO_DECIMAL_READER(String, Decimal128) + DEFINE_STRING_VARIANT_CONVERT_CONVERT_TO_DECIMAL_READER(Char, Decimal64) + DEFINE_STRING_VARIANT_CONVERT_CONVERT_TO_DECIMAL_READER(Char, Decimal128) + DEFINE_STRING_VARIANT_CONVERT_CONVERT_TO_DECIMAL_READER(Varchar, Decimal64) + DEFINE_STRING_VARIANT_CONVERT_CONVERT_TO_DECIMAL_READER(Varchar, Decimal128) #define CREATE_READER(NAME) \ return std::make_unique(readType, fileType, stripe, throwOnOverflow); @@ -1242,18 +1490,24 @@ namespace orc { CASE_CREATE_READER(LONG, StringToLong) CASE_CREATE_READER(FLOAT, StringToFloat) CASE_CREATE_READER(DOUBLE, StringToDouble) + CASE_CREATE_READER(STRING, StringToString) CASE_CREATE_READER(CHAR, StringToChar) CASE_CREATE_READER(VARCHAR, StringToVarchar) - case STRING: + CASE_CREATE_READER(TIMESTAMP, StringToTimestamp) + CASE_CREATE_READER(TIMESTAMP_INSTANT, StringToTimestamp) + case DECIMAL: { + if (isDecimal64(readType)) { + CREATE_READER(StringToDecimal64ColumnReader) + } else { + CREATE_READER(StringToDecimal128ColumnReader) + } + } case BINARY: - case TIMESTAMP: case LIST: case MAP: case STRUCT: case UNION: case DATE: - case TIMESTAMP_INSTANT: - case DECIMAL: CASE_EXCEPTION } } @@ -1267,17 +1521,23 @@ namespace orc { CASE_CREATE_READER(FLOAT, CharToFloat) CASE_CREATE_READER(DOUBLE, CharToDouble) CASE_CREATE_READER(STRING, CharToString) + CASE_CREATE_READER(CHAR, CharToChar) CASE_CREATE_READER(VARCHAR, CharToVarchar) - case CHAR: + CASE_CREATE_READER(TIMESTAMP, CharToTimestamp) + CASE_CREATE_READER(TIMESTAMP_INSTANT, CharToTimestamp) + case DECIMAL: { + if (isDecimal64(readType)) { + CREATE_READER(CharToDecimal64ColumnReader) + } else { + CREATE_READER(CharToDecimal128ColumnReader) + } + } case BINARY: - case TIMESTAMP: case LIST: case MAP: case STRUCT: case UNION: case DATE: - case TIMESTAMP_INSTANT: - case DECIMAL: CASE_EXCEPTION } } @@ -1292,16 +1552,22 @@ namespace orc { CASE_CREATE_READER(DOUBLE, VarcharToDouble) CASE_CREATE_READER(STRING, VarcharToString) CASE_CREATE_READER(CHAR, VarcharToChar) - case VARCHAR: + CASE_CREATE_READER(VARCHAR, VarcharToVarchar) + CASE_CREATE_READER(TIMESTAMP, VarcharToTimestamp) + CASE_CREATE_READER(TIMESTAMP_INSTANT, VarcharToTimestamp) + case DECIMAL: { + if (isDecimal64(readType)) { + CREATE_READER(VarcharToDecimal64ColumnReader) + } else { + CREATE_READER(VarcharToDecimal128ColumnReader) + } + } case BINARY: - case TIMESTAMP: case LIST: case MAP: case STRUCT: case UNION: case DATE: - case TIMESTAMP_INSTANT: - case DECIMAL: CASE_EXCEPTION } } diff --git a/c++/src/SchemaEvolution.cc b/c++/src/SchemaEvolution.cc index ab4007309b..7cf3b5c512 100644 --- a/c++/src/SchemaEvolution.cc +++ b/c++/src/SchemaEvolution.cc @@ -106,7 +106,8 @@ namespace orc { case STRING: case CHAR: case VARCHAR: { - ret.isValid = ret.needConvert = isStringVariant(readType) || isNumeric(readType); + ret.isValid = ret.needConvert = isStringVariant(readType) || isNumeric(readType) || + isTimestamp(readType) || isDecimal(readType); break; } case TIMESTAMP: diff --git a/c++/test/TestConvertColumnReader.cc b/c++/test/TestConvertColumnReader.cc index f9f7ac61d5..bebe251f4f 100644 --- a/c++/test/TestConvertColumnReader.cc +++ b/c++/test/TestConvertColumnReader.cc @@ -976,4 +976,250 @@ namespace orc { EXPECT_EQ(std::string(readC3.data[3], readC3.length[3]), "1234"); } + // Returns year/month/day triple in civil calendar + // Preconditions: z is number of days since 1970-01-01 and is in the range: + // [numeric_limits::min(), numeric_limits::max()-719468]. + template + constexpr std::tuple civil_from_days(Int z) noexcept { + static_assert(std::numeric_limits::digits >= 18, + "This algorithm has not been ported to a 16 bit unsigned integer"); + static_assert(std::numeric_limits::digits >= 20, + "This algorithm has not been ported to a 16 bit signed integer"); + z += 719468; + const Int era = (z >= 0 ? z : z - 146096) / 146097; + const unsigned doe = static_cast(z - era * 146097); // [0, 146096] + const unsigned yoe = (doe - doe / 1460 + doe / 36524 - doe / 146096) / 365; // [0, 399] + const Int y = static_cast(yoe) + era * 400; + const unsigned doy = doe - (365 * yoe + yoe / 4 - yoe / 100); // [0, 365] + const unsigned mp = (5 * doy + 2) / 153; // [0, 11] + const unsigned d = doy - (153 * mp + 2) / 5 + 1; // [1, 31] + const unsigned m = mp < 10 ? mp + 3 : mp - 9; // [1, 12] + return std::tuple(y + (m <= 2), m, d); + } + + static std::string timestampToString(int64_t seconds, int64_t nanos, + const std::string& zoneName) { + auto& timezone = getTimezoneByName(zoneName); + seconds = timezone.convertToUTC(seconds); + time_t t = static_cast(seconds); + char buffer[100]; + constexpr auto SECOND_IN_DAY = 3600 * 24; + auto day = t < 0 ? (t - SECOND_IN_DAY + 1) / SECOND_IN_DAY : t / SECOND_IN_DAY; + + auto [y, m, d] = civil_from_days(day); + auto second_in_day = t % (3600 * 24); + if (second_in_day < 0) { + second_in_day += 3600 * 24; + } + auto h = second_in_day % (3600 * 24) / 3600; + auto min = second_in_day % 3600 / 60; + auto s = second_in_day % 60; + std::snprintf(buffer, sizeof(buffer), "%04d-%02d-%02d %02ld:%02ld:%02ld", y, m, d, h, min, s); + std::string result(buffer); + if (nanos) { + while (nanos % 10 == 0) nanos /= 10; + result = result + "." + std::to_string(nanos); + } + result = result + " " + zoneName; + return result; + } + + TEST(ConvertColumnReader, TestConvertStringVariantToTimestamp) { + constexpr int DEFAULT_MEM_STREAM_SIZE = 10 * 1024 * 1024; + constexpr int TEST_CASES = 1024; + const std::string writerTimezone = "America/New_York"; + const std::string readerTimezone = "Australia/Sydney"; + MemoryOutputStream memStream(DEFAULT_MEM_STREAM_SIZE); + std::unique_ptr fileType(Type::buildTypeFromString("struct")); + std::shared_ptr readType( + Type::buildTypeFromString("struct")); + WriterOptions options; + options.setTimezoneName(writerTimezone); + auto writer = createWriter(*fileType, &memStream, options); + auto batch = writer->createRowBatch(TEST_CASES); + auto structBatch = dynamic_cast(batch.get()); + auto& c1 = dynamic_cast(*structBatch->fields[0]); + auto& c2 = dynamic_cast(*structBatch->fields[1]); + + std::vector raw1, raw2; + raw1.reserve(TEST_CASES * 3); + raw2.reserve(TEST_CASES * 3); + std::vector ts1, ts2; + + for (int i = 0; i < TEST_CASES; i++) { + char buff[100]; + auto size = ::snprintf(buff, sizeof(buff), "%04d-%02d-27 12:34:56.789", 1960 + (i / 12), + (i % 12) + 1); + raw1.emplace_back(buff, size); + raw2.push_back(raw1.back() + " " + writerTimezone); + c1.data[i] = const_cast(raw1.back().c_str()); + c1.length[i] = raw1.back().length(); + c2.data[i] = const_cast(raw2.back().c_str()); + c2.length[i] = raw2.back().length(); + } + structBatch->numElements = c1.numElements = c2.numElements = TEST_CASES; + structBatch->hasNulls = c1.hasNulls = c2.hasNulls = false; + writer->add(*batch); + + for (int i = 0; i < TEST_CASES; i++) { + char buff[100]; + auto size = + ::snprintf(buff, sizeof(buff), "%04d-%02d-27 12:34:56", 1960 + (i / 12), (i % 12) + 1); + raw1.emplace_back(buff, size); + raw2.push_back(raw1.back() + " " + writerTimezone); + c1.data[i] = const_cast(raw1.back().c_str()); + c1.length[i] = raw1.back().length(); + c2.data[i] = const_cast(raw2.back().c_str()); + c2.length[i] = raw2.back().length(); + } + structBatch->numElements = c1.numElements = c2.numElements = TEST_CASES; + structBatch->hasNulls = c1.hasNulls = c2.hasNulls = false; + writer->add(*batch); + + { + raw1.push_back("2024?11-14 00:01:02"); + raw2.push_back("2024-01-02 03:04:05.678 tz/error"); + c1.data[0] = const_cast(raw1.back().c_str()); + c1.length[0] = raw1.back().length(); + c2.data[0] = const_cast(raw2.back().c_str()); + c2.length[0] = raw2.back().length(); + + c1.notNull[1] = false; + c2.notNull[1] = false; + + raw1.push_back("2024-12-14 00:01:02.-1"); + raw2.push_back("2024-01-02 03:04:05.678"); + c1.data[2] = const_cast(raw1.back().c_str()); + c1.length[2] = raw1.back().length(); + c2.data[2] = const_cast(raw2.back().c_str()); + c2.length[2] = raw2.back().length(); + } + structBatch->numElements = c1.numElements = c2.numElements = 3; + structBatch->hasNulls = c1.hasNulls = c2.hasNulls = true; + writer->add(*batch); + + writer->close(); + + auto inStream = std::make_unique(memStream.getData(), memStream.getLength()); + auto pool = getDefaultPool(); + auto reader = createReader(*pool, std::move(inStream)); + RowReaderOptions rowReaderOptions; + rowReaderOptions.setUseTightNumericVector(true); + rowReaderOptions.setReadType(readType); + rowReaderOptions.setTimezoneName(readerTimezone); + rowReaderOptions.throwOnSchemaEvolutionOverflow(true); + auto rowReader = reader->createRowReader(rowReaderOptions); + auto readBatch = rowReader->createRowBatch(TEST_CASES * 2); + EXPECT_EQ(true, rowReader->next(*readBatch)); + + auto& readSturctBatch = dynamic_cast(*readBatch); + auto& readC1 = dynamic_cast(*readSturctBatch.fields[0]); + auto& readC2 = dynamic_cast(*readSturctBatch.fields[1]); + + for (int i = 0; i < TEST_CASES * 2; i++) { + EXPECT_TRUE(readC1.notNull[i]); + EXPECT_TRUE(readC2.notNull[i]); + EXPECT_EQ(raw1[i] + " " + readerTimezone, + timestampToString(readC1.data[i], readC1.nanoseconds[i], readerTimezone)); + EXPECT_EQ(raw2[i], timestampToString(readC2.data[i], readC2.nanoseconds[i], writerTimezone)); + } + + rowReaderOptions.throwOnSchemaEvolutionOverflow(false); + rowReader = reader->createRowReader(rowReaderOptions); + EXPECT_EQ(true, rowReader->next(*readBatch)); + EXPECT_EQ(true, rowReader->next(*readBatch)); + EXPECT_EQ(3, readBatch->numElements); + EXPECT_FALSE(readC1.notNull[0]); + EXPECT_FALSE(readC2.notNull[0]); + EXPECT_FALSE(readC1.notNull[1]); + EXPECT_FALSE(readC2.notNull[1]); + EXPECT_FALSE(readC1.notNull[2]); + EXPECT_FALSE(readC2.notNull[2]); + } + + TEST(ConvertColumnReader, TestConvertStringVariantToDecimal) { + constexpr int DEFAULT_MEM_STREAM_SIZE = 10 * 1024 * 1024; + constexpr int TEST_CASES = 1024; + MemoryOutputStream memStream(DEFAULT_MEM_STREAM_SIZE); + std::unique_ptr fileType(Type::buildTypeFromString("struct")); + std::shared_ptr readType( + Type::buildTypeFromString("struct")); + WriterOptions options; + auto writer = createWriter(*fileType, &memStream, options); + auto batch = writer->createRowBatch(TEST_CASES); + auto structBatch = dynamic_cast(batch.get()); + auto& c1 = dynamic_cast(*structBatch->fields[0]); + auto& c2 = dynamic_cast(*structBatch->fields[1]); + + // + std::vector> rawDataAndExpected; + + rawDataAndExpected = { + /* 0 */ {"123456789012345678901234567890123456789", false, false, int64_t(), Int128()}, + /* 1 */ {"123456789012345678901234567890.1234567890", false, false, int64_t(), Int128()}, + /* 2 */ {"-123456789012345678901234567890.1234567890", false, false, int64_t(), Int128()}, + /* 3 */ {"-foo.bar", false, false, int64_t(), Int128()}, + /* 4 */ {"-foo.123", false, false, int64_t(), Int128()}, + /* 5 */ {"-123.foo", false, false, int64_t(), Int128()}, + /* 6 */ {"-123foo.123", false, false, int64_t(), Int128()}, + /* 7 */ {"-123.123foo", false, false, int64_t(), Int128()}, + /* 8 */ {"-.", false, false, int64_t(), Int128()}, + /* 9 */ {"-", false, false, int64_t(), Int128()}, + /* 10 */ {".", false, false, int64_t(), Int128()}, + /* 11 */ {"", false, false, int64_t(), Int128()}, + /* 12 */ {".12345", false, false, int64_t(), Int128()}, + /* 13 */ {"12345.", false, false, int64_t(), Int128()}, + /* 14 */ {"-1", true, true, -100000LL, Int128("-10000000000")}, + /* 15 */ {"-1.0", true, true, -100000LL, Int128("-10000000000")}, + /* 16 */ {"1", true, true, 100000, Int128("10000000000")}, + /* 17 */ {"1.0", true, true, 100000, Int128("10000000000")}, + /* 18 */ {"12345", true, true, 1234500000, Int128("123450000000000")}, + /* 19 */ {"12345.12345", true, true, 1234512345LL, Int128("123451234500000")}, + /* 20 */ {"-12345.12345", true, true, -1234512345LL, Int128("-123451234500000")}, + /* 21 */ {"1234567890", false, true, int64_t(), Int128("12345678900000000000")}, + /* 22 */ {"-1234567890", false, true, int64_t(), Int128("-12345678900000000000")}, + /* 23 */ {"1234567890.123", false, true, int64_t(), Int128("12345678901230000000")}, + /* 24 */ {"-1234567890.1234567", false, true, int64_t(), Int128("-12345678901234567000")}, + /* 25 */ {"1234567890123.12345", false, true, int64_t(), Int128("12345678901231234500000")}, + /* 26 */ + {"-1234567890123.12345678901", false, true, int64_t(), Int128("-12345678901231234567890")}}; + for (int i = 0; i < rawDataAndExpected.size(); i++) { + c1.data[i] = c2.data[i] = const_cast(std::get<0>(rawDataAndExpected[i]).c_str()); + c1.length[i] = c2.length[i] = std::get<0>(rawDataAndExpected[i]).length(); + } + + structBatch->numElements = c1.numElements = c2.numElements = rawDataAndExpected.size(); + structBatch->hasNulls = c1.hasNulls = c2.hasNulls = false; + writer->add(*batch); + writer->close(); + + auto inStream = std::make_unique(memStream.getData(), memStream.getLength()); + auto pool = getDefaultPool(); + auto reader = createReader(*pool, std::move(inStream)); + RowReaderOptions rowReaderOptions; + rowReaderOptions.setUseTightNumericVector(true); + rowReaderOptions.setReadType(readType); + auto rowReader = reader->createRowReader(rowReaderOptions); + auto readBatch = rowReader->createRowBatch(TEST_CASES); + EXPECT_EQ(true, rowReader->next(*readBatch)); + + auto& readSturctBatch = dynamic_cast(*readBatch); + auto& readC1 = dynamic_cast(*readSturctBatch.fields[0]); + auto& readC2 = dynamic_cast(*readSturctBatch.fields[1]); + EXPECT_EQ(readBatch->numElements, rawDataAndExpected.size()); + + for (int i = 0; i < readBatch->numElements; i++) { + bool expectedNotNull1 = std::get<1>(rawDataAndExpected[i]); + bool expectedNotNull2 = std::get<2>(rawDataAndExpected[i]); + EXPECT_EQ(expectedNotNull1, readC1.notNull[i]) << i; + EXPECT_EQ(expectedNotNull2, readC2.notNull[i]) << i; + if (expectedNotNull1) { + EXPECT_EQ(std::get<3>(rawDataAndExpected[i]), readC1.values[i]) << i; + } + if (expectedNotNull2) { + EXPECT_EQ(std::get<4>(rawDataAndExpected[i]), readC2.values[i]) << i; + } + } + } + } // namespace orc diff --git a/c++/test/TestSchemaEvolution.cc b/c++/test/TestSchemaEvolution.cc index 12001fca6c..d146853573 100644 --- a/c++/test/TestSchemaEvolution.cc +++ b/c++/test/TestSchemaEvolution.cc @@ -45,17 +45,17 @@ namespace orc { directEncoding.set_kind(proto::ColumnEncoding_Kind_DIRECT); EXPECT_CALL(streams, getEncoding(testing::_)).WillRepeatedly(testing::Return(directEncoding)); - EXPECT_CALL(streams, getStreamProxy(testing::_, testing::_, testing::_)) - .WillRepeatedly(testing::Return(nullptr)); - std::string dummyStream("dummy"); - ON_CALL(streams, getStreamProxy(1, proto::Stream_Kind_SECONDARY, testing::_)) - .WillByDefault(testing::Return( - new SeekableArrayInputStream(dummyStream.c_str(), dummyStream.length()))); + EXPECT_CALL(streams, getStreamProxy(testing::_, testing::_, testing::_)) + .WillRepeatedly(testing::ReturnNew(dummyStream.c_str(), + dummyStream.length())); + EXPECT_CALL(streams, isDecimalAsLong()).WillRepeatedly(testing::Return(false)); EXPECT_CALL(streams, getSchemaEvolution()).WillRepeatedly(testing::Return(&se)); + EXPECT_CALL(streams, getSelectedColumns()) + .WillRepeatedly(testing::Return(std::vector{true, true})); - EXPECT_TRUE(buildReader(*fileType, streams) != nullptr); + EXPECT_TRUE(buildReader(*fileType, streams, true) != nullptr); } return true; } @@ -66,8 +66,8 @@ namespace orc { {2, "struct"}, {3, "struct"}, {4, "struct"}, {5, "struct"}, {6, "struct"}, {7, "struct"}, - {8, "struct"}, {9, "struct"}, - {10, "struct"}, {11, "struct"}, + {8, "struct"}, {9, "struct"}, + {10, "struct"}, {11, "struct"}, {12, "struct"}, {13, "struct"}, {14, "struct"}, {15, "struct"}, {16, "struct"}}; @@ -164,6 +164,22 @@ namespace orc { } } + // conversion from string variant to decimal + for (size_t i = 7; i <= 11; i++) { + for (size_t j = 12; j <= 13; j++) { + canConvert[i][j] = true; + needConvert[i][j] = (i != j); + } + } + + // conversion from string variant to timestamp + for (size_t i = 7; i <= 11; i++) { + for (size_t j = 14; j <= 15; j++) { + canConvert[i][j] = true; + needConvert[i][j] = (i != j); + } + } + for (size_t i = 0; i < typesSize; i++) { for (size_t j = 0; j < typesSize; j++) { testConvertReader(types[i], types[j], canConvert[i][j], needConvert[i][j]);