Skip to content

Commit

Permalink
[source-mssql] update datetimeoffset format (#45142)
Browse files Browse the repository at this point in the history
  • Loading branch information
xiaohansong authored Sep 5, 2024
1 parent 4264c08 commit 458e96e
Show file tree
Hide file tree
Showing 6 changed files with 9 additions and 10 deletions.
2 changes: 1 addition & 1 deletion airbyte-integrations/connectors/source-mssql/metadata.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ data:
connectorSubtype: database
connectorType: source
definitionId: b5ea17b1-f170-46dc-bc31-cc744ca984c1
dockerImageTag: 4.1.10
dockerImageTag: 4.1.11
dockerRepository: airbyte/source-mssql
documentationUrl: https://docs.airbyte.com/integrations/sources/mssql
githubIssueLabel: source-mssql
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@
import com.google.common.collect.ImmutableList;
import com.microsoft.sqlserver.jdbc.SQLServerResultSetMetaData;
import io.airbyte.cdk.db.jdbc.JdbcDatabase;
import io.airbyte.cdk.db.jdbc.JdbcUtils;
import io.airbyte.cdk.integrations.source.relationaldb.CursorInfo;
import io.airbyte.cdk.integrations.source.relationaldb.models.CursorBasedStatus;
import io.airbyte.cdk.integrations.source.relationaldb.models.InternalModels.StateType;
Expand Down Expand Up @@ -79,7 +78,7 @@ public static void getIndexInfoForStreams(final JdbcDatabase database, final Con
final String query = INDEX_QUERY.formatted(fullTableName);
LOGGER.debug("Index lookup query: {}", query);
final List<JsonNode> jsonNodes = database.bufferedResultSetQuery(conn -> conn.prepareStatement(query).executeQuery(),
resultSet -> JdbcUtils.getDefaultSourceOperations().rowToJson(resultSet));
resultSet -> new MssqlSourceOperations().rowToJson(resultSet));
if (jsonNodes != null) {
jsonNodes.stream().map(node -> Jsons.convertValue(node, Index.class))
.forEach(i -> LOGGER.info("Index {}", i));
Expand All @@ -106,7 +105,7 @@ public static String getMaxOcValueForStream(final JdbcDatabase database,
LOGGER.info("Querying for max oc value: {}", maxOcQuery);
try {
final List<JsonNode> jsonNodes = database.bufferedResultSetQuery(conn -> conn.prepareStatement(maxOcQuery).executeQuery(),
resultSet -> JdbcUtils.getDefaultSourceOperations().rowToJson(resultSet));
resultSet -> new MssqlSourceOperations().rowToJson(resultSet));
Preconditions.checkState(jsonNodes.size() == 1);
if (jsonNodes.get(0).get(MAX_OC_COL) == null) {
LOGGER.info("Max PK is null for table {} - this could indicate an empty table", fullTableName);
Expand Down Expand Up @@ -213,7 +212,7 @@ public static Map<io.airbyte.protocol.models.v0.AirbyteStreamNameNamespacePair,
final List<JsonNode> jsonNodes;
try {
jsonNodes = database.bufferedResultSetQuery(conn -> conn.prepareStatement(cursorBasedSyncStatusQuery).executeQuery(),
resultSet -> JdbcUtils.getDefaultSourceOperations().rowToJson(resultSet));
resultSet -> new MssqlSourceOperations().rowToJson(resultSet));
} catch (SQLException e) {
throw new RuntimeException("Failed to read max cursor value from %s.%s".formatted(namespace, name), e);
}
Expand Down Expand Up @@ -241,7 +240,7 @@ private static List<JsonNode> getTableEstimate(final JdbcDatabase database, fina
String.format(TABLE_ESTIMATE_QUERY, namespace, name);
LOGGER.info("Querying for table estimate size: {}", tableEstimateQuery);
final List<JsonNode> jsonNodes = database.bufferedResultSetQuery(conn -> conn.createStatement().executeQuery(tableEstimateQuery),
resultSet -> JdbcUtils.getDefaultSourceOperations().rowToJson(resultSet));
resultSet -> new MssqlSourceOperations().rowToJson(resultSet));
Preconditions.checkState(jsonNodes.size() == 1);
LOGGER.debug("Estimate: {}", jsonNodes);
return jsonNodes;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@

package io.airbyte.integrations.source.mssql;

import static io.airbyte.cdk.db.DataTypeUtils.OFFSETDATETIME_FORMATTER;
import static io.airbyte.cdk.db.DataTypeUtils.TIMESTAMPTZ_FORMATTER;
import static io.airbyte.cdk.db.jdbc.JdbcConstants.INTERNAL_COLUMN_NAME;
import static io.airbyte.cdk.db.jdbc.JdbcConstants.INTERNAL_COLUMN_TYPE;
import static io.airbyte.cdk.db.jdbc.JdbcConstants.INTERNAL_COLUMN_TYPE_NAME;
Expand Down Expand Up @@ -183,7 +183,7 @@ public JsonSchemaType getAirbyteType(final JDBCType jdbcType) {
protected void setTimestampWithTimezone(final PreparedStatement preparedStatement, final int parameterIndex, final String value)
throws SQLException {
try {
final OffsetDateTime offsetDateTime = OffsetDateTime.parse(value, OFFSETDATETIME_FORMATTER);
final OffsetDateTime offsetDateTime = OffsetDateTime.parse(value, TIMESTAMPTZ_FORMATTER);
final Timestamp timestamp = Timestamp.valueOf(offsetDateTime.atZoneSameInstant(offsetDateTime.getOffset()).toLocalDateTime());
// Final step of conversion from
// OffsetDateTime (a Java construct) object -> Timestamp (a Java construct) ->
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -436,7 +436,6 @@ static Optional<OrderedColumnInfo> getOrderedColumnInfo(final JdbcDatabase datab
final JDBCType ocFieldType = table.getFields().stream()
.filter(field -> field.getName().equals(ocFieldName))
.findFirst().get().getType();

final String ocMaxValue = MssqlQueryUtils.getMaxOcValueForStream(database, stream, ocFieldName, quoteString);
return Optional.of(new OrderedColumnInfo(ocFieldName, ocFieldType, ocMaxValue));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ public void setDateTimeOffsetColumnAsCursor() throws SQLException {
executeQuery(insertQuery);
expectedRecords.add(jsonNode);
}
final String cursorAnchorValue = "2023-01-01 00:00:00.0000000 +00:00";
final String cursorAnchorValue = "2023-01-01T00:00:00.000000+00:00";
final List<JsonNode> actualRecords = new ArrayList<>();
try (final Connection connection = testdb.getContainer().createConnection("")) {
final PreparedStatement preparedStatement = connection.prepareStatement(
Expand Down
1 change: 1 addition & 0 deletions docs/integrations/sources/mssql.md
Original file line number Diff line number Diff line change
Expand Up @@ -422,6 +422,7 @@ WHERE actor_definition_id ='b5ea17b1-f170-46dc-bc31-cc744ca984c1' AND (configura

| Version | Date | Pull Request | Subject |
|:--------|:-----------|:------------------------------------------------------------------------------------------------------------------|:------------------------------------------------------------------------------------------------------------------------------------------------|
| 4.1.11 | 2024-09-04 | [45142](https://github.com/airbytehq/airbyte/pull/45142) | Fix incorrect datetimeoffset format in cursor state. |
| 4.1.10 | 2024-08-27 | [44759](https://github.com/airbytehq/airbyte/pull/44759) | Improve null safety in parsing debezium change events. |
| 4.1.9 | 2024-08-27 | [44841](https://github.com/airbytehq/airbyte/pull/44841) | Adopt latest CDK. |
| 4.1.8 | 2024-08-08 | [43410](https://github.com/airbytehq/airbyte/pull/43410) | Adopt latest CDK. |
Expand Down

0 comments on commit 458e96e

Please sign in to comment.