Skip to content

Commit

Permalink
fix: iceberg timestamp value handling (#49807)
Browse files Browse the repository at this point in the history
  • Loading branch information
subodh1810 authored Dec 16, 2024
1 parent 3af71ab commit 369cde7
Show file tree
Hide file tree
Showing 7 changed files with 177 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,11 @@ import java.time.temporal.ChronoUnit
*/
class TimeStringToInteger : AirbyteValueIdentityMapper() {
companion object {
private val DATE_TIME_FORMATTER: DateTimeFormatter =
val DATE_TIME_FORMATTER: DateTimeFormatter =
DateTimeFormatter.ofPattern(
"[yyyy][yy]['-']['/']['.'][' '][MMM][MM][M]['-']['/']['.'][' '][dd][d][[' '][G]][[' ']['T']HH:mm[':'ss[.][SSSSSS][SSSSS][SSSS][SSS][' '][z][zzz][Z][O][x][XXX][XX][X][[' '][G]]]]"
)
private val TIME_FORMATTER: DateTimeFormatter =
val TIME_FORMATTER: DateTimeFormatter =
DateTimeFormatter.ofPattern(
"HH:mm[':'ss[.][SSSSSS][SSSSS][SSSS][SSS][' '][z][zzz][Z][O][x][XXX][XX][X]]"
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,16 +58,27 @@ class AirbyteValueToIcebergRecord {
return array
}
is BooleanValue -> return airbyteValue.value
is DateValue ->
throw IllegalArgumentException("String-based date types are not supported")
is DateValue -> return TimeStringUtility.toLocalDate(airbyteValue.value)
is IntegerValue -> return airbyteValue.value.toLong()
is NullValue -> return null
is NumberValue -> return airbyteValue.value.toDouble()
is StringValue -> return airbyteValue.value
is TimeValue ->
throw IllegalArgumentException("String-based time types are not supported")
return when (type.typeId()) {
Type.TypeID.TIME -> TimeStringUtility.toOffset(airbyteValue.value)
else ->
throw IllegalArgumentException(
"${type.typeId()} type is not allowed for TimeValue"
)
}
is TimestampValue ->
throw IllegalArgumentException("String-based timestamp types are not supported")
return when (type.typeId()) {
Type.TypeID.TIMESTAMP -> TimeStringUtility.toOffsetDateTime(airbyteValue.value)
else ->
throw IllegalArgumentException(
"${type.typeId()} type is not allowed for TimestampValue"
)
}
is UnknownValue -> throw IllegalArgumentException("Unknown type is not supported")
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
/*
* Copyright (c) 2024 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.cdk.load.data.iceberg.parquet

import io.airbyte.cdk.load.command.DestinationStream
import io.airbyte.cdk.load.data.AirbyteSchemaNoopMapper
import io.airbyte.cdk.load.data.AirbyteValueNoopMapper
import io.airbyte.cdk.load.data.MapperPipeline
import io.airbyte.cdk.load.data.MapperPipelineFactory
import io.airbyte.cdk.load.data.MergeUnions
import io.airbyte.cdk.load.data.NullOutOfRangeIntegers
import io.airbyte.cdk.load.data.SchemalessValuesToJsonString
import io.airbyte.cdk.load.data.UnionTypeToDisjointRecord
import io.airbyte.cdk.load.data.UnionValueToDisjointRecord

class IcebergParquetPipelineFactory : MapperPipelineFactory {
override fun create(stream: DestinationStream): MapperPipeline =
MapperPipeline(
stream.schema,
listOf(
AirbyteSchemaNoopMapper() to SchemalessValuesToJsonString(),
AirbyteSchemaNoopMapper() to NullOutOfRangeIntegers(),
MergeUnions() to AirbyteValueNoopMapper(),
UnionTypeToDisjointRecord() to UnionValueToDisjointRecord(),
),
)
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
/*
* Copyright (c) 2024 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.cdk.load.data.iceberg.parquet

import io.airbyte.cdk.load.data.TimeStringToInteger
import java.time.LocalDate
import java.time.LocalDateTime
import java.time.LocalTime
import java.time.OffsetDateTime
import java.time.OffsetTime
import java.time.ZoneOffset
import java.time.ZonedDateTime

object TimeStringUtility {

fun toLocalDate(dateString: String): LocalDate {
return LocalDate.parse(dateString, TimeStringToInteger.DATE_TIME_FORMATTER)
}

fun toOffset(timeString: String): LocalTime {
return try {
toMicrosOfDayWithTimezone(timeString)
} catch (e: Exception) {
toMicrosOfDayWithoutTimezone(timeString)
}
}

private fun toMicrosOfDayWithTimezone(timeString: String): LocalTime {
return OffsetTime.parse(timeString, TimeStringToInteger.TIME_FORMATTER).toLocalTime()
}

private fun toMicrosOfDayWithoutTimezone(timeString: String): LocalTime {
return LocalTime.parse(timeString, TimeStringToInteger.TIME_FORMATTER)
}

fun toOffsetDateTime(timestampString: String): OffsetDateTime {
return try {
toOffsetDateTimeWithTimezone(timestampString)
} catch (e: Exception) {
toOffsetDateTimeWithoutTimezone(timestampString)
}
}

private fun toOffsetDateTimeWithTimezone(timestampString: String): OffsetDateTime {
return ZonedDateTime.parse(timestampString, TimeStringToInteger.DATE_TIME_FORMATTER)
.toOffsetDateTime()
}

private fun toOffsetDateTimeWithoutTimezone(timestampString: String): OffsetDateTime {
return LocalDateTime.parse(timestampString, TimeStringToInteger.DATE_TIME_FORMATTER)
.atOffset(ZoneOffset.UTC)
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
/*
* Copyright (c) 2024 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.cdk.load.data.icerberg.parquet

import io.airbyte.cdk.load.data.iceberg.parquet.TimeStringUtility
import java.time.LocalDate
import java.time.LocalDateTime
import java.time.LocalTime
import java.time.OffsetDateTime
import java.time.ZoneOffset
import org.junit.jupiter.api.Assertions.assertEquals
import org.junit.jupiter.api.Assertions.assertThrows
import org.junit.jupiter.api.Test

class TimeStringUtilityTest {

@Test
fun `toLocalDate should parse a valid date string`() {
val dateStr = "2024-12-16T00:00:00"
val date = TimeStringUtility.toLocalDate(dateStr)
assertEquals(LocalDate.of(2024, 12, 16), date)
}

@Test
fun `toLocalDate should throw exception for invalid date string`() {
val invalidDateStr = "invalid-date"
assertThrows(java.time.format.DateTimeParseException::class.java) {
TimeStringUtility.toLocalDate(invalidDateStr)
}
}

@Test
fun `toOffset should parse time with timezone`() {
val timeStrWithOffset = "12:34:56+02:00"
val localTime = TimeStringUtility.toOffset(timeStrWithOffset)
assertEquals(LocalTime.of(12, 34, 56), localTime)
}

@Test
fun `toOffset should parse time without timezone`() {
val timeStrWithoutOffset = "12:34:56"
val localTime = TimeStringUtility.toOffset(timeStrWithoutOffset)
assertEquals(LocalTime.of(12, 34, 56), localTime)
}

@Test
fun `toOffsetDateTime should parse datetime with timezone`() {
val dateTimeWithTz = "2024-12-16T12:34:56-05:00"
val odt = TimeStringUtility.toOffsetDateTime(dateTimeWithTz)
assertEquals(OffsetDateTime.of(2024, 12, 16, 12, 34, 56, 0, ZoneOffset.of("-05:00")), odt)
}

@Test
fun `toOffsetDateTime should parse datetime without timezone as UTC`() {
val dateTimeWithoutTz = "2024-12-16T12:34:56"
val odt = TimeStringUtility.toOffsetDateTime(dateTimeWithoutTz)
assertEquals(
OffsetDateTime.of(LocalDateTime.of(2024, 12, 16, 12, 34, 56), ZoneOffset.UTC),
odt
)
}

@Test
fun `toOffsetDateTime should throw exception for invalid format`() {
val invalidDateTime = "invalid-datetime"
assertThrows(java.time.format.DateTimeParseException::class.java) {
TimeStringUtility.toOffsetDateTime(invalidDateTime)
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,10 @@ data:
connectorSubtype: file
connectorType: destination
definitionId: 37a928c1-2d5c-431a-a97d-ae236bd1ea0c
dockerImageTag: 0.1.13
dockerImageTag: 0.1.14
dockerRepository: airbyte/destination-iceberg-v2
githubIssueLabel: destination-iceberg-v2
icon: s3.svg
icon: icon.svg
license: ELv2
name: Iceberg V2 Destination
registryOverrides:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
package io.airbyte.integrations.destination.iceberg.v2

import io.airbyte.cdk.load.command.DestinationStream
import io.airbyte.cdk.load.data.parquet.ParquetMapperPipelineFactory
import io.airbyte.cdk.load.data.iceberg.parquet.IcebergParquetPipelineFactory
import io.airbyte.cdk.load.write.DestinationWriter
import io.airbyte.cdk.load.write.StreamLoader
import io.airbyte.integrations.destination.iceberg.v2.io.IcebergTableWriterFactory
Expand All @@ -23,7 +23,7 @@ class IcebergV2Writer(
override fun createStreamLoader(stream: DestinationStream): StreamLoader {
val properties = icebergUtil.toCatalogProperties(config = icebergConfiguration)
val catalog = icebergUtil.createCatalog(DEFAULT_CATALOG_NAME, properties)
val pipeline = ParquetMapperPipelineFactory().create(stream)
val pipeline = IcebergParquetPipelineFactory().create(stream)
val schema = icebergUtil.toIcebergSchema(stream = stream, pipeline = pipeline)
val table =
icebergUtil.createTable(
Expand Down

0 comments on commit 369cde7

Please sign in to comment.