Skip to content

Commit

Permalink
JSON configuration persistence (#17)
Browse files Browse the repository at this point in the history
  • Loading branch information
cgardens authored Aug 10, 2020
1 parent 0589033 commit 4319ba4
Show file tree
Hide file tree
Showing 17 changed files with 328 additions and 3 deletions.
29 changes: 29 additions & 0 deletions conduit-config-persistence/build.gradle
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
plugins {
id 'java'
id "com.diffplug.spotless" version "5.1.1"
}

group 'io.dataline.conduit'
version '0.1.0'

repositories {
mavenCentral()
}

compileJava.dependsOn 'spotlessApply'

spotless {
java {
googleJavaFormat('1.8')
}
}

dependencies {
testCompile group: 'junit', name: 'junit', version: '4.12'

compile group: "com.fasterxml.jackson.core", name: "jackson-databind", version: "2.9.8"
compile group: "com.networknt", name: "json-schema-validator", version: "1.0.42"

implementation project(':conduit-config')

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
package io.dataline.conduit.persistence;

import java.util.Set;

public interface ConfigPersistence {
<T> T getConfig(PersistenceConfigType persistenceConfigType, String configId, Class<T> clazz);

<T> Set<T> getConfigs(PersistenceConfigType persistenceConfigType, Class<T> clazz);

<T> void writeConfig(
PersistenceConfigType persistenceConfigType, String configId, T config, Class<T> clazz);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,181 @@
package io.dataline.conduit.persistence;

import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.networknt.schema.*;
import io.dataline.conduit.conduit_config.ConfigSchema;
import io.dataline.conduit.conduit_config.StandardScheduleConfiguration;
import java.io.File;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.Optional;
import java.util.Set;
import java.util.stream.Collectors;

public class ConfigPersistenceImpl implements ConfigPersistence {
private static final String CONFIG_STORAGE_ROOT = "data/config/";
private static final String CONFIG_SCHEMA_ROOT = "conduit-config/src/main/resources/json/";

private final ObjectMapper objectMapper;
private final SchemaValidatorsConfig schemaValidatorsConfig;
private final JsonSchemaFactory jsonSchemaFactory;

public ConfigPersistenceImpl() {
objectMapper = new ObjectMapper();
schemaValidatorsConfig = new SchemaValidatorsConfig();
jsonSchemaFactory = JsonSchemaFactory.getInstance(SpecVersion.VersionFlag.V7);
}

@Override
public <T> T getConfig(
PersistenceConfigType persistenceConfigType, String configId, Class<T> clazz) {
// find file
File configFile = getFileOrThrow(persistenceConfigType, configId);

// validate file with schema
validateJson(configFile, persistenceConfigType, configId);

// cast file to type
return fileToPojo(configFile, clazz);
}

@Override
public <T> Set<T> getConfigs(PersistenceConfigType persistenceConfigType, Class<T> clazz) {
return getConfigIds(persistenceConfigType).stream()
.map(configId -> getConfig(persistenceConfigType, configId, clazz))
.collect(Collectors.toSet());
}

@Override
public <T> void writeConfig(
PersistenceConfigType persistenceConfigType, String configId, T config, Class<T> clazz) {
try {
objectMapper.writeValue(new File(getConfigPath(persistenceConfigType, configId)), config);
} catch (IOException e) {
throw new RuntimeException(e);
}
}

private JsonSchema getSchema(PersistenceConfigType persistenceConfigType) {
String configSchemaFilename =
standardConfigTypeToConfigSchema(persistenceConfigType).getSchemaFilename();
File schemaFile = new File(String.format("%s/%s", CONFIG_SCHEMA_ROOT, configSchemaFilename));
JsonNode schema;
try {
schema = objectMapper.readTree(schemaFile);
} catch (IOException e) {
throw new RuntimeException(e);
}
return jsonSchemaFactory.getSchema(schema, schemaValidatorsConfig);
}

private Set<Path> getFiles(PersistenceConfigType persistenceConfigType) {
String configDirPath = getConfigDirectory(persistenceConfigType);
try {
return Files.list(new File(configDirPath).toPath()).collect(Collectors.toSet());
} catch (IOException e) {
throw new RuntimeException(e);
}
}

private String getConfigDirectory(PersistenceConfigType persistenceConfigType) {
return String.format("%s/%s", CONFIG_STORAGE_ROOT, persistenceConfigType.toString());
}

private String getConfigPath(PersistenceConfigType persistenceConfigType, String configId) {
return String.format("%s/%s", getConfigDirectory(persistenceConfigType), getFilename(configId));
}

private Set<String> getConfigIds(PersistenceConfigType persistenceConfigType) {
return getFiles(persistenceConfigType).stream()
.map(path -> path.getFileName().toString().replace(".json", ""))
.collect(Collectors.toSet());
}

private Optional<Path> getFile(PersistenceConfigType persistenceConfigType, String id) {
String configPath = getConfigPath(persistenceConfigType, id);
final Path path = Paths.get(configPath);
if (Files.exists(path)) {
return Optional.of(path);
} else {
return Optional.empty();
}
}

private String getFilename(String id) {
return String.format("%s.json", id);
}

private ConfigSchema standardConfigTypeToConfigSchema(
PersistenceConfigType persistenceConfigType) {
switch (persistenceConfigType) {
case SOURCE_CONNECTION_CONFIGURATION:
return ConfigSchema.SOURCE_CONNECTION_CONFIGURATION;
case STANDARD_CONNECTION_STATUS:
return ConfigSchema.STANDARD_CONNECTION_STATUS;
case STANDARD_DISCOVERY_OUTPUT:
return ConfigSchema.STANDARD_DISCOVERY_OUTPUT;
case DESTINATION_CONNECTION_CONFIGURATION:
return ConfigSchema.DESTINATION_CONNECTION_CONFIGURATION;
case STANDARD_SYNC_CONFIGURATION:
return ConfigSchema.STANDARD_SYNC_CONFIGURATION;
case STANDARD_SYNC_SUMMARY:
return ConfigSchema.STANDARD_SYNC_SUMMARY;
case STANDARD_SYNC_STATE:
return ConfigSchema.STANDARD_SYNC_STATE;
case STATE:
return ConfigSchema.STATE;
case STANDARD_SYNC_SCHEDULE:
return ConfigSchema.STANDARD_SYNC_SCHEDULE;
default:
throw new RuntimeException(
String.format(
"No mapping from StandardConfigType to ConfigSchema for %s",
persistenceConfigType));
}
}

private void validateJson(
File configFile, PersistenceConfigType persistenceConfigType, String configId) {
JsonNode configJson;
try {
configJson = objectMapper.readTree(configFile);
} catch (IOException e) {
throw new RuntimeException(e);
}

JsonSchema schema = getSchema(persistenceConfigType);
Set<ValidationMessage> validationMessages = schema.validate(configJson);
if (validationMessages.size() > 0) {
throw new IllegalStateException(
String.format(
"json schema validation failed. type: %s id: %s \n errors: %s \n schema: \n%s \n object: \n%s",
persistenceConfigType,
configId,
validationMessages.stream()
.map(ValidationMessage::toString)
.collect(Collectors.joining(",")),
schema.getSchemaNode().toPrettyString(),
configJson.toPrettyString()));
}
}

private File getFileOrThrow(PersistenceConfigType persistenceConfigType, String configId) {
return getFile(persistenceConfigType, configId)
.map(Path::toFile)
.orElseThrow(
() ->
new RuntimeException(
String.format("config %s %s not found", persistenceConfigType, configId)));
}

private <T> T fileToPojo(File file, Class<T> clazz) {
try {
return objectMapper.readValue(file, clazz);
} catch (IOException e) {
throw new RuntimeException(e);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
package io.dataline.conduit.persistence;

public enum PersistenceConfigType {
SOURCE_CONNECTION_CONFIGURATION,
STANDARD_CONNECTION_STATUS,
STANDARD_DISCOVERY_OUTPUT,
DESTINATION_CONNECTION_CONFIGURATION,
STANDARD_SYNC_CONFIGURATION,
STANDARD_SYNC_SUMMARY,
STANDARD_SYNC_STATE,
STATE,
STANDARD_SYNC_SCHEDULE
}
1 change: 1 addition & 0 deletions conduit-config/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -31,4 +31,5 @@ dependencies {
}

jsonSchema2Pojo {
targetPackage = 'io.dataline.conduit.conduit_config'
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
package io.dataline.conduit.conduit_config;

public enum ConfigSchema {
SOURCE_CONNECTION_CONFIGURATION("SourceConnectionConfiguration.json"),
STANDARD_CONNECTION_STATUS("StandardConnectionStatus.json"),
STANDARD_DISCOVERY_OUTPUT("StandardDiscoveryOutput.json"),
DESTINATION_CONNECTION_CONFIGURATION("DestinationConnectionConfiguration.json"),
STANDARD_SYNC_CONFIGURATION("StandardSyncConfiguration.json"),
STANDARD_SYNC_SUMMARY("StandardSyncSummary.json"),
STANDARD_SYNC_STATE("StandardSyncState.json"),
STANDARD_SYNC_SCHEDULE("StandardSyncSchedule.json"),
STATE("State.json");

private final String schemaFilename;

ConfigSchema(String schemaFilename) {
this.schemaFilename = schemaFilename;
}

public String getSchemaFilename() {
return schemaFilename;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
{
"$schema": "http://json-schema.org/draft-07/schema#",
"$id": "DestinationConnectionConfiguration",
"title": "DestinationConnectionConfiguration",
"description": "information required for connection to a destination.",
"type": "object",
"required": ["destinationSpecificationId", "configuration"],
"additionalProperties": false,
"properties": {
"destinationSpecificationId": {
"type": "string",
"format": "uuid"
},
"configuration": {
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
{
"$schema": "http://json-schema.org/draft-07/schema#",
"$id": "SourceConnectionConfiguration",
"title": "SourceConnectionConfiguration",
"description": "information required for connection to a destination.", "type": "object",
"required": ["sourceSpecificationId", "configuration"],
"additionalProperties": false,
"properties": {
"sourceSpecificationId": {
"type": "string",
"format": "uuid"
},
"configuration": {
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
"description": "describes the result of a 'test connection' action.",
"type": "object",
"required": ["status"],
"additionalProperties": false,
"properties": {
"status": {
"type": "string",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@
"schema": {
"description": "describes the available schema.",
"type": "object",
"required": ["tables"],
"additionalProperties": false,
"properties": {
"tables": {
"type": "array",
Expand All @@ -16,6 +18,7 @@
"name",
"columns"
],
"additionalProperties": false,
"properties": {
"name": {
"type": "string"
Expand All @@ -28,6 +31,7 @@
"name",
"dataType"
],
"additionalProperties": false,
"properties": {
"name": {
"type": "string"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
"description": "describes the standard output for any discovery run.",
"type": "object",
"required": ["schema"],
"additionalProperties": false,
"properties": {
"schema": {
"description": "describes the available schema.",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@
"title": "StandardSyncConfiguration",
"description": "configuration required for sync for ALL taps",
"type": "object",
"required": ["syncMode", "schema"],
"additionalProperties": false,
"properties": {
"syncMode": {
"type": "string",
Expand Down
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
{
"$schema": "http://json-schema.org/draft-07/schema#",
"$id": "StandardScheduleConfiguration",
"$id": "https://dataline.io/docs/models/StandardScheduleConfiguration.json",
"title": "StandardScheduleConfiguration",
"type": "object",
"required": ["timeUnit", "units"],
"additionalProperties": false,
"properties": {
"timeUnit": {
"type": "string",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@
"title": "StandardSyncSummary",
"description": "standard information output by ALL taps for a sync step (our version of state.json)",
"type": "object",
"required": ["attemptId", "tables", "recordsSynced", "version", "status", "startTime", "endTime"],
"additionalProperties": false,
"properties": {
"attemptId": {
"type": "string",
Expand Down
17 changes: 17 additions & 0 deletions conduit-config/src/main/resources/json/State.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
{
"$schema": "http://json-schema.org/draft-07/schema#",
"$id": "DestinationConnectionConfiguration",
"title": "DestinationConnectionConfiguration",
"description": "information output by the connection.",
"type": "object",
"required": ["connectionId", "state"],
"additionalProperties": false,
"properties": {
"connectionId": {
"type": "string",
"format": "uuid"
},
"state": {
}
}
}
Loading

0 comments on commit 4319ba4

Please sign in to comment.