Skip to content

Commit

Permalink
🎉 Destination MySQl - Added support for MySQL destination via TLS/SSL (
Browse files Browse the repository at this point in the history
…#6506)

* add support SSL for MySQL destination

* updated ssl tests and add documentation

* updated code style

* changed default ssl value as true
  • Loading branch information
andriikorotkov authored Sep 29, 2021
1 parent a595c75 commit bbf098a
Show file tree
Hide file tree
Showing 8 changed files with 178 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,6 @@
"destinationDefinitionId": "ca81ee7c-3163-4246-af40-094cc31e5e42",
"name": "MySQL",
"dockerRepository": "airbyte/destination-mysql",
"dockerImageTag": "0.1.12",
"dockerImageTag": "0.1.13",
"documentationUrl": "https://docs.airbyte.io/integrations/destinations/mysql"
}
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@
- destinationDefinitionId: ca81ee7c-3163-4246-af40-094cc31e5e42
name: MySQL
dockerRepository: airbyte/destination-mysql
dockerImageTag: 0.1.12
dockerImageTag: 0.1.13
documentationUrl: https://docs.airbyte.io/integrations/destinations/mysql
- destinationDefinitionId: d4353156-9217-4cad-8dd7-c108fd4f74cf
name: MS SQL Server
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,5 +8,5 @@ COPY build/distributions/${APPLICATION}*.tar ${APPLICATION}.tar

RUN tar xf ${APPLICATION}.tar --strip-components=1

LABEL io.airbyte.version=0.1.12
LABEL io.airbyte.version=0.1.13
LABEL io.airbyte.name=airbyte/destination-mysql
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import io.airbyte.integrations.destination.mysql.MySQLSqlOperations.VersionCompatibility;
import io.airbyte.protocol.models.AirbyteConnectionStatus;
import io.airbyte.protocol.models.AirbyteConnectionStatus.Status;
import java.util.ArrayList;
import java.util.List;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -73,12 +74,27 @@ protected JdbcDatabase getDatabase(JsonNode config) {

@Override
public JsonNode toJdbcConfig(JsonNode config) {
final List<String> additionalParameters = new ArrayList<>();

if (config.has("ssl") && config.get("ssl").asBoolean()) {
additionalParameters.add("useSSL=true");
additionalParameters.add("requireSSL=true");
additionalParameters.add("verifyServerCertificate=false");
}

final StringBuilder jdbcUrl = new StringBuilder(String.format("jdbc:mysql://%s:%s/%s",
config.get("host").asText(),
config.get("port").asText(),
config.get("database").asText()));

if (!additionalParameters.isEmpty()) {
jdbcUrl.append("?");
additionalParameters.forEach(x -> jdbcUrl.append(x).append("&"));
}

ImmutableMap.Builder<Object, Object> configBuilder = ImmutableMap.builder()
.put("username", config.get("username").asText())
.put("jdbc_url", String.format("jdbc:mysql://%s:%s/%s",
config.get("host").asText(),
config.get("port").asText(),
config.get("database").asText()));
.put("jdbc_url", jdbcUrl.toString());

if (config.has("password")) {
configBuilder.put("password", config.get("password").asText());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,13 @@
"type": "string",
"airbyte_secret": true,
"order": 4
},
"ssl": {
"title": "SSL Connection",
"description": "Encrypt data using SSL.",
"type": "boolean",
"default": true,
"order": 5
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ protected JsonNode getConfig() {
.put("password", db.getPassword())
.put("database", db.getDatabaseName())
.put("port", db.getFirstMappedPort())
.put("ssl", false)
.build());
}

Expand All @@ -78,6 +79,7 @@ protected JsonNode getFailCheckConfig() {
.put("password", "wrong password")
.put("database", db.getDatabaseName())
.put("port", db.getFirstMappedPort())
.put("ssl", false)
.build());
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,145 @@
/*
* Copyright (c) 2021 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.integrations.destination.mysql;

import com.fasterxml.jackson.databind.JsonNode;
import com.google.common.collect.ImmutableMap;
import io.airbyte.commons.json.Jsons;
import io.airbyte.db.Databases;
import io.airbyte.integrations.base.JavaBaseConstants;
import io.airbyte.integrations.destination.ExtendedNameTransformer;
import java.sql.SQLException;
import java.util.List;
import java.util.stream.Collectors;
import org.jooq.JSONFormat;
import org.jooq.JSONFormat.RecordFormat;
import org.jooq.SQLDialect;
import org.junit.jupiter.api.Test;
import org.testcontainers.containers.MySQLContainer;

public class SslMySQLDestinationAcceptanceTest extends MySQLDestinationAcceptanceTest {

private static final JSONFormat JSON_FORMAT = new JSONFormat().recordFormat(RecordFormat.OBJECT);

private MySQLContainer<?> db;
private final ExtendedNameTransformer namingResolver = new MySQLNameTransformer();

@Override
protected JsonNode getConfig() {
return Jsons.jsonNode(ImmutableMap.builder()
.put("host", db.getHost())
.put("username", db.getUsername())
.put("password", db.getPassword())
.put("database", db.getDatabaseName())
.put("port", db.getFirstMappedPort())
.put("ssl", true)
.build());
}

@Override
protected JsonNode getFailCheckConfig() {
return Jsons.jsonNode(ImmutableMap.builder()
.put("host", db.getHost())
.put("username", db.getUsername())
.put("password", "wrong password")
.put("database", db.getDatabaseName())
.put("port", db.getFirstMappedPort())
.put("ssl", false)
.build());
}

@Override
protected List<JsonNode> retrieveRecords(TestDestinationEnv testEnv,
String streamName,
String namespace,
JsonNode streamSchema)
throws Exception {
return retrieveRecordsFromTable(namingResolver.getRawTableName(streamName), namespace)
.stream()
.map(r -> Jsons.deserialize(r.get(JavaBaseConstants.COLUMN_NAME_DATA).asText()))
.collect(Collectors.toList());
}

@Override
protected List<JsonNode> retrieveNormalizedRecords(TestDestinationEnv testEnv, String streamName, String namespace) throws Exception {
String tableName = namingResolver.getIdentifier(streamName);
String schema = namingResolver.getIdentifier(namespace);
return retrieveRecordsFromTable(tableName, schema);
}

@Override
@Test
public void testCustomDbtTransformations() {
// We need to create view for testing custom dbt transformations
executeQuery("GRANT CREATE VIEW ON *.* TO " + db.getUsername() + "@'%';");
// overrides test with a no-op until https://github.com/dbt-labs/jaffle_shop/pull/8 is merged
// super.testCustomDbtTransformations();
}

@Override
protected void setup(TestDestinationEnv testEnv) {
db = new MySQLContainer<>("mysql:8.0");
db.start();
setLocalInFileToTrue();
revokeAllPermissions();
grantCorrectPermissions();
}

@Override
protected void tearDown(TestDestinationEnv testEnv) {
db.stop();
db.close();
}

private List<JsonNode> retrieveRecordsFromTable(String tableName, String schemaName) throws SQLException {
return Databases.createDatabase(
db.getUsername(),
db.getPassword(),
String.format("jdbc:mysql://%s:%s/%s?useSSL=true&requireSSL=true&verifyServerCertificate=false",
db.getHost(),
db.getFirstMappedPort(),
db.getDatabaseName()),
"com.mysql.cj.jdbc.Driver",
SQLDialect.MYSQL).query(
ctx -> ctx
.fetch(String.format("SELECT * FROM %s.%s ORDER BY %s ASC;", schemaName, tableName,
JavaBaseConstants.COLUMN_NAME_EMITTED_AT))
.stream()
.map(r -> r.formatJSON(JSON_FORMAT))
.map(Jsons::deserialize)
.collect(Collectors.toList()));
}

private void setLocalInFileToTrue() {
executeQuery("set global local_infile=true");
}

private void revokeAllPermissions() {
executeQuery("REVOKE ALL PRIVILEGES, GRANT OPTION FROM " + db.getUsername() + "@'%';");
}

private void grantCorrectPermissions() {
executeQuery("GRANT ALTER, CREATE, INSERT, SELECT, DROP ON *.* TO " + db.getUsername() + "@'%';");
}

private void executeQuery(String query) {
try {
Databases.createDatabase(
"root",
"test",
String.format("jdbc:mysql://%s:%s/%s?useSSL=true&requireSSL=true&verifyServerCertificate=false",
db.getHost(),
db.getFirstMappedPort(),
db.getDatabaseName()),
"com.mysql.cj.jdbc.Driver",
SQLDialect.MYSQL).query(
ctx -> ctx
.execute(query));
} catch (SQLException e) {
throw new RuntimeException(e);
}
}

}
1 change: 1 addition & 0 deletions docs/integrations/destinations/mysql.md
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,7 @@ Using this feature requires additional configuration, when creating the destinat

| Version | Date | Pull Request | Subject |
| :--- | :--- | :--- | :--- |
| 0.1.13 | 2021-09-28 | [#6506](https://github.com/airbytehq/airbyte/pull/6506) | Added support for MySQL destination via TLS/SSL |
| 0.1.12 | 2021-09-24 | [#6317](https://github.com/airbytehq/airbyte/pull/6317) | Added option to connect to DB via SSH |
| 0.1.11 | 2021-07-30 | [#5125](https://github.com/airbytehq/airbyte/pull/5125) | Enable `additionalPropertities` in spec.json |
| 0.1.10 | 2021-07-28 | [#5026](https://github.com/airbytehq/airbyte/pull/5026) | Add sanitized json fields in raw tables to handle quotes in column names |
Expand Down

0 comments on commit bbf098a

Please sign in to comment.