Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

🎉Destination MySQl - added ssl strict encrypt connector #6763

Merged
merged 17 commits into from
Oct 14, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
17 commits
Select commit Hold shift + click to select a range
afcc8c1
[6423] Destination MySQl - added ssl strict encrypt connector
etsybaev Oct 5, 2021
2fb7ee6
[6423] Destination MySQl - fixed tests and checkstyle
etsybaev Oct 5, 2021
9777b67
[6423] Destination MySQl - added README.md
etsybaev Oct 5, 2021
647b3af
[6423] Destination MySQl - Added changelog
etsybaev Oct 5, 2021
1f64d27
Merge branch 'master' into etsybaev/6423-destination-mysql-ssl-strict…
etsybaev Oct 6, 2021
36bacce
Merge branch 'master' into etsybaev/6423-destination-mysql-ssl-strict…
etsybaev Oct 6, 2021
ceaaf1a
fixed code style for changes from master
etsybaev Oct 6, 2021
17d8232
fixed build.gradle file
etsybaev Oct 6, 2021
c150db3
Merge branch 'master' into etsybaev/6423-destination-mysql-ssl-strict…
etsybaev Oct 6, 2021
60844c5
Merge branch 'master' into etsybaev/6423-destination-mysql-ssl-strict…
etsybaev Oct 7, 2021
972e547
Merge branch 'master' into etsybaev/6423-destination-mysql-ssl-strict…
etsybaev Oct 7, 2021
ffbd8b1
Merge branch 'master' into etsybaev/6423-destination-mysql-ssl-strict…
etsybaev Oct 7, 2021
692c661
Merge branch 'master' into etsybaev/6423-destination-mysql-ssl-strict…
etsybaev Oct 11, 2021
f363e90
Merge branch 'master' into etsybaev/6423-destination-mysql-ssl-strict…
etsybaev Oct 12, 2021
4e40b92
fixed checkstyle
etsybaev Oct 12, 2021
46ed4ac
Merge branch 'master' into etsybaev/6423-destination-mysql-ssl-strict…
etsybaev Oct 12, 2021
6176275
Merge branch 'master' into etsybaev/6423-destination-mysql-ssl-strict…
etsybaev Oct 13, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
*
!Dockerfile
!build
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
FROM airbyte/integration-base-java:dev

WORKDIR /airbyte

ENV APPLICATION destination-mysql-strict-encrypt

COPY build/distributions/${APPLICATION}*.tar ${APPLICATION}.tar

RUN tar xf ${APPLICATION}.tar --strip-components=1

LABEL io.airbyte.version=0.1.0
LABEL io.airbyte.name=airbyte/destination-mysql-strict-encrypt
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
# MySql Strict Encrypt Test Configuration

In order to test the MySql destination, you need to have the up and running MySql database that has SSL enabled.

This connector inherits the MySql destination, but support SSL connections only.
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
plugins {
id 'application'
id 'airbyte-docker'
id 'airbyte-integration-test-java'
}

application {
mainClass = 'io.airbyte.integrations.destination.mysql.MySQLDestinationStrictEncrypt'
applicationDefaultJvmArgs = ['-XX:MaxRAMPercentage=75.0']
}

dependencies {
implementation project(':airbyte-db:lib')
implementation project(':airbyte-integrations:bases:base-java')
implementation project(':airbyte-protocol:models')
implementation project(':airbyte-integrations:connectors:destination-jdbc')
implementation project(':airbyte-integrations:connectors:destination-mysql')

implementation 'mysql:mysql-connector-java:8.0.22'

integrationTestJavaImplementation project(':airbyte-integrations:bases:standard-destination-test')
integrationTestJavaImplementation project(':airbyte-integrations:connectors:destination-mysql')
integrationTestJavaImplementation "org.testcontainers:mysql:1.15.1"

implementation files(project(':airbyte-integrations:bases:base-java').airbyteDocker.outputs)
integrationTestJavaImplementation files(project(':airbyte-integrations:bases:base-normalization').airbyteDocker.outputs)
}

Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
/*
* Copyright (c) 2021 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.integrations.destination.mysql;

import com.fasterxml.jackson.databind.node.ObjectNode;
import io.airbyte.commons.json.Jsons;
import io.airbyte.integrations.base.Destination;
import io.airbyte.integrations.base.IntegrationRunner;
import io.airbyte.integrations.base.spec_modification.SpecModifyingDestination;
import io.airbyte.protocol.models.ConnectorSpecification;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class MySQLDestinationStrictEncrypt extends SpecModifyingDestination implements Destination {

private static final Logger LOGGER = LoggerFactory.getLogger(MySQLDestinationStrictEncrypt.class);

public MySQLDestinationStrictEncrypt() {
super(MySQLDestination.sshWrappedDestination());
}

@Override
public ConnectorSpecification modifySpec(ConnectorSpecification originalSpec) {
final ConnectorSpecification spec = Jsons.clone(originalSpec);
((ObjectNode) spec.getConnectionSpecification().get("properties")).remove("ssl");
return spec;
}

public static void main(String[] args) throws Exception {
final Destination destination = new MySQLDestinationStrictEncrypt();
LOGGER.info("starting destination: {}", MySQLDestinationStrictEncrypt.class);
new IntegrationRunner(destination).run(args);
LOGGER.info("completed destination: {}", MySQLDestinationStrictEncrypt.class);
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,251 @@
/*
* Copyright (c) 2021 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.integrations.destination.mysql;

import static org.junit.jupiter.api.Assertions.assertEquals;

import com.fasterxml.jackson.databind.JsonNode;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
import io.airbyte.commons.json.Jsons;
import io.airbyte.db.Databases;
import io.airbyte.integrations.base.JavaBaseConstants;
import io.airbyte.integrations.destination.ExtendedNameTransformer;
import io.airbyte.integrations.standardtest.destination.DestinationAcceptanceTest;
import io.airbyte.protocol.models.AirbyteCatalog;
import io.airbyte.protocol.models.AirbyteMessage;
import io.airbyte.protocol.models.AirbyteMessage.Type;
import io.airbyte.protocol.models.AirbyteRecordMessage;
import io.airbyte.protocol.models.AirbyteStateMessage;
import io.airbyte.protocol.models.CatalogHelpers;
import io.airbyte.protocol.models.ConfiguredAirbyteCatalog;
import java.sql.SQLException;
import java.time.Instant;
import java.util.ArrayList;
import java.util.List;
import java.util.stream.Collectors;
import org.jooq.JSONFormat;
import org.jooq.JSONFormat.RecordFormat;
import org.jooq.SQLDialect;
import org.junit.jupiter.api.Test;
import org.testcontainers.containers.MySQLContainer;

public class MySQLStrictEncryptDestinationAcceptanceTest extends DestinationAcceptanceTest {

private static final JSONFormat JSON_FORMAT = new JSONFormat().recordFormat(RecordFormat.OBJECT);

private MySQLContainer<?> db;
private final ExtendedNameTransformer namingResolver = new MySQLNameTransformer();

@Override
protected String getImageName() {
return "airbyte/destination-mysql-strict-encrypt:dev";
}

@Override
protected boolean supportsDBT() {
return true;
}

@Override
protected boolean implementsNamespaces() {
return true;
}

@Override
protected boolean supportsNormalization() {
return true;
}

@Override
protected JsonNode getConfig() {
return Jsons.jsonNode(ImmutableMap.builder()
.put("host", db.getHost())
.put("username", db.getUsername())
.put("password", db.getPassword())
.put("database", db.getDatabaseName())
.put("port", db.getFirstMappedPort())
.build());
}

@Override
protected JsonNode getFailCheckConfig() {
return Jsons.jsonNode(ImmutableMap.builder()
.put("host", db.getHost())
.put("username", db.getUsername())
.put("password", "wrong password")
.put("database", db.getDatabaseName())
.put("port", db.getFirstMappedPort())
.build());
}

@Override
protected String getDefaultSchema(JsonNode config) {
if (config.get("database") == null) {
return null;
}
return config.get("database").asText();
}

@Override
protected List<JsonNode> retrieveRecords(TestDestinationEnv testEnv,
String streamName,
String namespace,
JsonNode streamSchema)
throws Exception {
return retrieveRecordsFromTable(namingResolver.getRawTableName(streamName), namespace)
.stream()
.map(r -> Jsons.deserialize(r.get(JavaBaseConstants.COLUMN_NAME_DATA).asText()))
.collect(Collectors.toList());
}

private List<JsonNode> retrieveRecordsFromTable(String tableName, String schemaName) throws SQLException {
return Databases.createDatabase(
db.getUsername(),
db.getPassword(),
String.format("jdbc:mysql://%s:%s/%s?useSSL=true&requireSSL=true&verifyServerCertificate=false",
db.getHost(),
db.getFirstMappedPort(),
db.getDatabaseName()),
"com.mysql.cj.jdbc.Driver",
SQLDialect.MYSQL).query(
ctx -> ctx
.fetch(String.format("SELECT * FROM %s.%s ORDER BY %s ASC;", schemaName, tableName,
JavaBaseConstants.COLUMN_NAME_EMITTED_AT))
.stream()
.map(r -> r.formatJSON(JSON_FORMAT))
.map(Jsons::deserialize)
.collect(Collectors.toList()));
}

@Override
protected List<JsonNode> retrieveNormalizedRecords(TestDestinationEnv testEnv, String streamName, String namespace) throws Exception {
String tableName = namingResolver.getIdentifier(streamName);
String schema = namingResolver.getIdentifier(namespace);
return retrieveRecordsFromTable(tableName, schema);
}

@Override
protected List<String> resolveIdentifier(String identifier) {
final List<String> result = new ArrayList<>();
final String resolved = namingResolver.getIdentifier(identifier);
result.add(identifier);
result.add(resolved);
if (!resolved.startsWith("\"")) {
result.add(resolved.toLowerCase());
}
return result;
}

@Override
protected void setup(TestDestinationEnv testEnv) {
db = new MySQLContainer<>("mysql:8.0");
db.start();
setLocalInFileToTrue();
revokeAllPermissions();
grantCorrectPermissions();
}

private void setLocalInFileToTrue() {
executeQuery("set global local_infile=true");
}

private void revokeAllPermissions() {
executeQuery("REVOKE ALL PRIVILEGES, GRANT OPTION FROM " + db.getUsername() + "@'%';");
}

private void grantCorrectPermissions() {
executeQuery("GRANT ALTER, CREATE, INSERT, SELECT, DROP ON *.* TO " + db.getUsername() + "@'%';");
}

private void executeQuery(String query) {
try {
Databases.createDatabase(
"root",
"test",
String.format("jdbc:mysql://%s:%s/%s?useSSL=true&requireSSL=true&verifyServerCertificate=false",
db.getHost(),
db.getFirstMappedPort(),
db.getDatabaseName()),
"com.mysql.cj.jdbc.Driver",
SQLDialect.MYSQL).query(
ctx -> ctx
.execute(query));
} catch (SQLException e) {
throw new RuntimeException(e);
}
}

@Override
protected void tearDown(TestDestinationEnv testEnv) {
db.stop();
db.close();
}

@Override
@Test
public void testCustomDbtTransformations() {
// We need to create view for testing custom dbt transformations
executeQuery("GRANT CREATE VIEW ON *.* TO " + db.getUsername() + "@'%';");
}

@Test
public void testJsonSync() throws Exception {
final String catalogAsText = "{\n"
+ " \"streams\": [\n"
+ " {\n"
+ " \"name\": \"exchange_rate\",\n"
+ " \"json_schema\": {\n"
+ " \"properties\": {\n"
+ " \"id\": {\n"
+ " \"type\": \"integer\"\n"
+ " },\n"
+ " \"data\": {\n"
+ " \"type\": \"string\"\n"
+ " }"
+ " }\n"
+ " }\n"
+ " }\n"
+ " ]\n"
+ "}\n";

final AirbyteCatalog catalog = Jsons.deserialize(catalogAsText, AirbyteCatalog.class);
final ConfiguredAirbyteCatalog configuredCatalog = CatalogHelpers.toDefaultConfiguredCatalog(catalog);
final List<AirbyteMessage> messages = Lists.newArrayList(
new AirbyteMessage()
.withType(Type.RECORD)
.withRecord(new AirbyteRecordMessage()
.withStream(catalog.getStreams().get(0).getName())
.withEmittedAt(Instant.now().toEpochMilli())
.withData(Jsons.jsonNode(ImmutableMap.builder()
.put("id", 1)
.put("data", "{\"name\":\"Conferência Faturamento - Custo - Taxas - Margem - Resumo ano inicial até -2\",\"description\":null}")
.build()))),
new AirbyteMessage()
.withType(Type.STATE)
.withState(new AirbyteStateMessage().withData(Jsons.jsonNode(ImmutableMap.of("checkpoint", 2)))));

final JsonNode config = getConfig();
final String defaultSchema = getDefaultSchema(config);
runSyncAndVerifyStateOutput(config, messages, configuredCatalog, false);
retrieveRawRecordsAndAssertSameMessages(catalog, messages, defaultSchema);
}

@Override
@Test
public void testLineBreakCharacters() {
// overrides test with a no-op until we handle full UTF-8 in the destination
}

protected void assertSameValue(JsonNode expectedValue, JsonNode actualValue) {
if (expectedValue.isBoolean()) {
// Boolean in MySQL are stored as TINYINT (0 or 1) so we force them to boolean values here
assertEquals(expectedValue.asBoolean(), actualValue.asBoolean());
} else {
assertEquals(expectedValue, actualValue);
}
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
/*
* Copyright (c) 2021 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.integrations.destination.mysql;

import static org.junit.jupiter.api.Assertions.assertEquals;

import io.airbyte.commons.json.Jsons;
import io.airbyte.commons.resources.MoreResources;
import io.airbyte.protocol.models.ConnectorSpecification;
import org.junit.jupiter.api.Test;

class MySqlDestinationStrictEncryptTest {

@Test
void testGetSpec() throws Exception {
System.out.println(new MySQLDestinationStrictEncrypt().spec().getConnectionSpecification());
assertEquals(Jsons.deserialize(MoreResources.readResource("expected_spec.json"), ConnectorSpecification.class),
new MySQLDestinationStrictEncrypt().spec());
}

}
Loading