diff --git a/.github/workflows/gradle.yml b/.github/workflows/gradle.yml
index 356d99ae81e6f..e7d985c2439b5 100644
--- a/.github/workflows/gradle.yml
+++ b/.github/workflows/gradle.yml
@@ -15,5 +15,3 @@ jobs:
- name: Test
run: ./tools/app/test.sh
-
-
diff --git a/dataline-commons/src/main/resources/log4j2.xml b/dataline-commons/src/main/resources/log4j2.xml
index 13be796af2f89..a31cee1e8101f 100644
--- a/dataline-commons/src/main/resources/log4j2.xml
+++ b/dataline-commons/src/main/resources/log4j2.xml
@@ -1,29 +1,33 @@
-
-
-
-
-
+
+
+
+
+
-
-
-
-
+
+
+
+
-
-
-
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
-
-
-
-
-
-
-
-
-
+
diff --git a/dataline-integrations/singer/csv/destination/Dockerfile b/dataline-integrations/singer/csv/destination/Dockerfile
new file mode 100644
index 0000000000000..f126fa501823e
--- /dev/null
+++ b/dataline-integrations/singer/csv/destination/Dockerfile
@@ -0,0 +1,16 @@
+FROM python:3.7-slim
+
+WORKDIR /singer
+
+ENV VIRTUAL_ENV=/singer/env
+RUN python -m venv $VIRTUAL_ENV
+ENV PATH="$VIRTUAL_ENV/bin:$PATH"
+
+# Install dependencies:
+COPY requirements.txt .
+RUN python -m pip install --upgrade pip && \
+ pip install -r requirements.txt
+
+WORKDIR /singer/data
+
+ENTRYPOINT ["target-csv"]
diff --git a/dataline-integrations/singer/csv/destination/requirements.txt b/dataline-integrations/singer/csv/destination/requirements.txt
new file mode 100644
index 0000000000000..aa8012a1919a9
--- /dev/null
+++ b/dataline-integrations/singer/csv/destination/requirements.txt
@@ -0,0 +1 @@
+target-csv == 0.3.0
diff --git a/dataline-integrations/singer/exchangerateapi_io/source/Dockerfile b/dataline-integrations/singer/exchangerateapi_io/source/Dockerfile
new file mode 100644
index 0000000000000..8bde97fc61335
--- /dev/null
+++ b/dataline-integrations/singer/exchangerateapi_io/source/Dockerfile
@@ -0,0 +1,16 @@
+FROM python:3.7-slim
+
+WORKDIR /singer
+
+ENV VIRTUAL_ENV=/singer/env
+RUN python -m venv $VIRTUAL_ENV
+ENV PATH="$VIRTUAL_ENV/bin:$PATH"
+
+# Install dependencies:
+COPY requirements.txt .
+RUN python -m pip install --upgrade pip && \
+ pip install -r requirements.txt
+
+WORKDIR /singer/data
+
+ENTRYPOINT ["tap-exchangeratesapi"]
diff --git a/dataline-integrations/singer/exchangerateapi_io/source/requirements.txt b/dataline-integrations/singer/exchangerateapi_io/source/requirements.txt
new file mode 100644
index 0000000000000..35004495bf5c9
--- /dev/null
+++ b/dataline-integrations/singer/exchangerateapi_io/source/requirements.txt
@@ -0,0 +1 @@
+tap-exchangeratesapi == 0.1.1
diff --git a/dataline-integrations/singer/postgres/destination/Dockerfile b/dataline-integrations/singer/postgres/destination/Dockerfile
new file mode 100644
index 0000000000000..f8c649f196859
--- /dev/null
+++ b/dataline-integrations/singer/postgres/destination/Dockerfile
@@ -0,0 +1,16 @@
+FROM python:3.7-slim
+
+WORKDIR /singer
+
+ENV VIRTUAL_ENV=/singer/env
+RUN python -m venv $VIRTUAL_ENV
+ENV PATH="$VIRTUAL_ENV/bin:$PATH"
+
+# Install dependencies:
+COPY requirements.txt .
+RUN python -m pip install --upgrade pip && \
+ pip install -r requirements.txt
+
+WORKDIR /singer/data
+
+ENTRYPOINT ["target-postgres"]
diff --git a/dataline-integrations/singer/postgres/destination/requirements.txt b/dataline-integrations/singer/postgres/destination/requirements.txt
new file mode 100644
index 0000000000000..7f447c14bb463
--- /dev/null
+++ b/dataline-integrations/singer/postgres/destination/requirements.txt
@@ -0,0 +1,2 @@
+psycopg2-binary == 2.8.5
+target-postgres == 1.1.3
diff --git a/dataline-integrations/singer/postgres/source/Dockerfile b/dataline-integrations/singer/postgres/source/Dockerfile
new file mode 100644
index 0000000000000..df075e01d9e66
--- /dev/null
+++ b/dataline-integrations/singer/postgres/source/Dockerfile
@@ -0,0 +1,22 @@
+FROM python:3.7-slim
+
+WORKDIR /singer
+
+ENV VIRTUAL_ENV=/singer/env
+RUN python -m venv $VIRTUAL_ENV
+ENV PATH="$VIRTUAL_ENV/bin:$PATH"
+
+# need gcc to compile psycopg2
+RUN apt-get update && \
+ apt-get install -y libpq-dev gcc
+
+# Install dependencies:
+COPY requirements.txt .
+RUN python -m pip install --upgrade pip && \
+ pip install -r requirements.txt
+
+RUN apt-get autoremove -y gcc
+
+WORKDIR /singer/data
+
+ENTRYPOINT ["tap-postgres"]
diff --git a/dataline-integrations/singer/postgres/source/requirements.txt b/dataline-integrations/singer/postgres/source/requirements.txt
new file mode 100644
index 0000000000000..ba77b235a50b4
--- /dev/null
+++ b/dataline-integrations/singer/postgres/source/requirements.txt
@@ -0,0 +1,2 @@
+psycopg2 == 2.7.4
+tap-postgres == 0.1.0
diff --git a/dataline-scheduler/src/main/java/io/dataline/scheduler/JobSubmitter.java b/dataline-scheduler/src/main/java/io/dataline/scheduler/JobSubmitter.java
index 781a193dc3d86..f926cefc62efd 100644
--- a/dataline-scheduler/src/main/java/io/dataline/scheduler/JobSubmitter.java
+++ b/dataline-scheduler/src/main/java/io/dataline/scheduler/JobSubmitter.java
@@ -48,6 +48,7 @@
import org.slf4j.LoggerFactory;
public class JobSubmitter implements Runnable {
+
private static final Logger LOGGER = LoggerFactory.getLogger(JobSubmitter.class);
private final ExecutorService threadPool;
diff --git a/dataline-scheduler/src/main/java/io/dataline/scheduler/WorkerWrapper.java b/dataline-scheduler/src/main/java/io/dataline/scheduler/WorkerWrapper.java
index 64a46a84ebbfc..98c6a23b58c57 100644
--- a/dataline-scheduler/src/main/java/io/dataline/scheduler/WorkerWrapper.java
+++ b/dataline-scheduler/src/main/java/io/dataline/scheduler/WorkerWrapper.java
@@ -43,6 +43,7 @@
import org.slf4j.LoggerFactory;
public class WorkerWrapper implements Runnable {
+
private static final Logger LOGGER = LoggerFactory.getLogger(WorkerWrapper.class);
private final long jobId;
@@ -106,7 +107,7 @@ public void run() {
FileUtils.forceMkdir(workspacesRoot.toFile());
final Path workspaceRoot = workspacesRoot.resolve(String.valueOf(jobId));
FileUtils.forceMkdir(workspaceRoot.toFile());
- OutputAndStatus outputAndStatus = worker.run(input, workspaceRoot.toString());
+ OutputAndStatus outputAndStatus = worker.run(input, workspaceRoot);
switch (outputAndStatus.getStatus()) {
case FAILED:
diff --git a/dataline-workers/src/main/java/io/dataline/workers/DockerProcessRunner.java b/dataline-workers/src/main/java/io/dataline/workers/DockerProcessRunner.java
new file mode 100644
index 0000000000000..bac575a51c4f2
--- /dev/null
+++ b/dataline-workers/src/main/java/io/dataline/workers/DockerProcessRunner.java
@@ -0,0 +1,61 @@
+/*
+ * MIT License
+ *
+ * Copyright (c) 2020 Dataline
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package io.dataline.workers;
+
+import com.google.common.collect.Lists;
+import java.io.IOException;
+import java.lang.ProcessBuilder.Redirect;
+import java.util.List;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class DockerProcessRunner {
+
+ private static final Logger LOGGER = LoggerFactory.getLogger(DockerProcessRunner.class);
+
+ public static void main(String[] args) throws IOException, InterruptedException {
+ ProcessBuilder processBuilderTap =
+ new ProcessBuilder("docker", "run", "dataline/integration-singer-exchangerateapi_io-source")
+ .redirectError(Redirect.INHERIT);
+
+ ProcessBuilder processBuilderTarget =
+ new ProcessBuilder(
+ "docker",
+ "run",
+ "-i",
+ "-v",
+ "/tmp/singer:/singer/data",
+ "dataline/integration-singer-csv-destination")
+ .redirectError(Redirect.INHERIT)
+ .redirectOutput(Redirect.INHERIT);
+
+ List processes =
+ ProcessBuilder.startPipeline(Lists.newArrayList(processBuilderTap, processBuilderTarget));
+
+ for (Process process : processes) {
+ process.waitFor();
+ }
+ }
+}
diff --git a/dataline-workers/src/main/java/io/dataline/workers/EchoWorker.java b/dataline-workers/src/main/java/io/dataline/workers/EchoWorker.java
index dbc5269fb5413..362643e68ee20 100644
--- a/dataline-workers/src/main/java/io/dataline/workers/EchoWorker.java
+++ b/dataline-workers/src/main/java/io/dataline/workers/EchoWorker.java
@@ -24,6 +24,7 @@
package io.dataline.workers;
+import java.nio.file.Path;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -33,7 +34,7 @@ public class EchoWorker implements Worker {
public EchoWorker() {}
@Override
- public OutputAndStatus run(String string, String workspaceRoot) {
+ public OutputAndStatus run(String string, Path workspaceRoot) {
LOGGER.info("Hello World. input: {}, workspace root: {}", string, workspaceRoot);
return new OutputAndStatus<>(JobStatus.SUCCESSFUL, "echoed");
}
diff --git a/dataline-workers/src/main/java/io/dataline/workers/Worker.java b/dataline-workers/src/main/java/io/dataline/workers/Worker.java
index 87c3c288f691c..9c53a6e820deb 100644
--- a/dataline-workers/src/main/java/io/dataline/workers/Worker.java
+++ b/dataline-workers/src/main/java/io/dataline/workers/Worker.java
@@ -24,12 +24,14 @@
package io.dataline.workers;
+import java.nio.file.Path;
+
public interface Worker {
/**
* Blocking call to run the worker's workflow. Once this is complete, getStatus should return
* either COMPLETE, FAILED, or CANCELLED.
*/
- OutputAndStatus run(InputType inputType, String workspacePath)
+ OutputAndStatus run(InputType inputType, Path workspacePath)
throws InvalidCredentialsException, InvalidCatalogException;
void cancel();
diff --git a/dataline-workers/src/main/java/io/dataline/workers/singer/BaseSingerWorker.java b/dataline-workers/src/main/java/io/dataline/workers/singer/BaseSingerWorker.java
index b6f9d67bf5295..b446a4b6015b0 100644
--- a/dataline-workers/src/main/java/io/dataline/workers/singer/BaseSingerWorker.java
+++ b/dataline-workers/src/main/java/io/dataline/workers/singer/BaseSingerWorker.java
@@ -27,38 +27,37 @@
import io.dataline.workers.JobStatus;
import io.dataline.workers.OutputAndStatus;
import io.dataline.workers.Worker;
-import java.io.BufferedReader;
-import java.io.FileReader;
-import java.io.FileWriter;
import java.io.IOException;
+import java.nio.charset.StandardCharsets;
import java.nio.file.Path;
import java.util.concurrent.TimeUnit;
-import java.util.stream.Collectors;
+import org.apache.commons.io.FileUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public abstract class BaseSingerWorker
implements Worker {
+
private static final Logger LOGGER = LoggerFactory.getLogger(BaseSingerWorker.class);
- protected JobStatus jobStatus;
+ protected final SingerConnector connector;
- private final String singerExecutablePath;
+ private JobStatus jobStatus;
- protected BaseSingerWorker(String singerExecutablePath) {
- this.singerExecutablePath = singerExecutablePath;
+ protected BaseSingerWorker(SingerConnector connector) {
+ this.connector = connector;
}
@Override
- public OutputAndStatus run(InputType inputType, String workspaceRoot) {
+ public OutputAndStatus run(InputType inputType, Path workspaceRoot) {
return runInternal(inputType, workspaceRoot);
}
- abstract OutputAndStatus runInternal(InputType inputType, String workspaceRoot);
+ abstract OutputAndStatus runInternal(InputType inputType, Path workspaceRoot);
protected void cancelHelper(Process workerProcess) {
try {
- jobStatus = JobStatus.FAILED;
+ updateJobStatus(JobStatus.FAILED);
workerProcess.destroy();
workerProcess.waitFor(10, TimeUnit.SECONDS);
if (workerProcess.isAlive()) {
@@ -69,30 +68,30 @@ protected void cancelHelper(Process workerProcess) {
}
}
- protected String readFileFromWorkspace(String workspaceRoot, String fileName) {
- try (FileReader fileReader = new FileReader(getWorkspaceFilePath(workspaceRoot, fileName));
- BufferedReader br = new BufferedReader(fileReader)) {
- return br.lines().collect(Collectors.joining("\n"));
+ protected static String readFile(Path workspaceRoot, String fileName) {
+ try {
+ Path filePath = workspaceRoot.resolve(fileName);
+ return FileUtils.readFileToString(filePath.toFile(), StandardCharsets.UTF_8);
} catch (IOException e) {
throw new RuntimeException(e);
}
}
- protected String writeFileToWorkspace(String workspaceRoot, String fileName, String contents) {
- String filePath = getWorkspaceFilePath(workspaceRoot, fileName);
- try (FileWriter fileWriter = new FileWriter(filePath)) {
- fileWriter.write(contents);
+ protected static Path writeFile(Path workspaceRoot, String fileName, String contents) {
+ try {
+ Path filePath = workspaceRoot.resolve(fileName);
+ FileUtils.writeStringToFile(filePath.toFile(), contents, StandardCharsets.UTF_8);
return filePath;
} catch (IOException e) {
throw new RuntimeException(e);
}
}
- protected String getExecutableAbsolutePath() {
- return singerExecutablePath;
+ protected static Path getFullPath(Path workspaceRoot, String fileName) {
+ return workspaceRoot.resolve(fileName);
}
- private String getWorkspaceFilePath(String workspaceRoot, String fileName) {
- return Path.of(workspaceRoot).resolve(fileName).toAbsolutePath().toString();
+ protected void updateJobStatus(JobStatus jobStatus) {
+ this.jobStatus = jobStatus;
}
}
diff --git a/dataline-workers/src/main/java/io/dataline/workers/singer/SingerCheckConnectionWorker.java b/dataline-workers/src/main/java/io/dataline/workers/singer/SingerCheckConnectionWorker.java
index 8cf56b49c8667..e481ac17b9903 100644
--- a/dataline-workers/src/main/java/io/dataline/workers/singer/SingerCheckConnectionWorker.java
+++ b/dataline-workers/src/main/java/io/dataline/workers/singer/SingerCheckConnectionWorker.java
@@ -30,24 +30,26 @@
import io.dataline.workers.CheckConnectionWorker;
import io.dataline.workers.JobStatus;
import io.dataline.workers.OutputAndStatus;
+import java.nio.file.Path;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class SingerCheckConnectionWorker
extends BaseSingerWorker
implements CheckConnectionWorker {
+
private static final Logger LOGGER = LoggerFactory.getLogger(SingerCheckConnectionWorker.class);
private final SingerDiscoveryWorker singerDiscoveryWorker;
- public SingerCheckConnectionWorker(String singerExecutablePath) {
- super(singerExecutablePath);
- this.singerDiscoveryWorker = new SingerDiscoveryWorker(singerExecutablePath);
+ public SingerCheckConnectionWorker(SingerConnector connector) {
+ super(connector);
+ this.singerDiscoveryWorker = new SingerDiscoveryWorker(connector);
}
@Override
OutputAndStatus runInternal(
- ConnectionImplementation connectionImplementation, String workspaceRoot) {
+ ConnectionImplementation connectionImplementation, Path workspaceRoot) {
OutputAndStatus outputAndStatus =
singerDiscoveryWorker.runInternal(connectionImplementation, workspaceRoot);
StandardConnectionStatus connectionStatus = new StandardConnectionStatus();
diff --git a/dataline-workers/src/main/java/io/dataline/workers/singer/SingerConnector.java b/dataline-workers/src/main/java/io/dataline/workers/singer/SingerConnector.java
index c937d429b2b50..55a81d9f3371d 100644
--- a/dataline-workers/src/main/java/io/dataline/workers/singer/SingerConnector.java
+++ b/dataline-workers/src/main/java/io/dataline/workers/singer/SingerConnector.java
@@ -25,7 +25,5 @@
package io.dataline.workers.singer;
interface SingerConnector {
- String getPythonVirtualEnvName();
-
- String getExecutableName();
+ String getImageName();
}
diff --git a/dataline-workers/src/main/java/io/dataline/workers/singer/SingerDiscoveryWorker.java b/dataline-workers/src/main/java/io/dataline/workers/singer/SingerDiscoveryWorker.java
index 7e65d0cc5f642..ba6fc309801c2 100644
--- a/dataline-workers/src/main/java/io/dataline/workers/singer/SingerDiscoveryWorker.java
+++ b/dataline-workers/src/main/java/io/dataline/workers/singer/SingerDiscoveryWorker.java
@@ -40,7 +40,6 @@
import io.dataline.config.Table;
import io.dataline.workers.DiscoverSchemaWorker;
import io.dataline.workers.OutputAndStatus;
-import java.io.File;
import java.io.IOException;
import java.nio.file.Path;
import java.util.List;
@@ -52,21 +51,23 @@
public class SingerDiscoveryWorker
extends BaseSingerWorker
implements DiscoverSchemaWorker {
+
+ private static final Logger LOGGER = LoggerFactory.getLogger(SingerDiscoveryWorker.class);
+
// TODO log errors to specified file locations
- private static Logger LOGGER = LoggerFactory.getLogger(SingerDiscoveryWorker.class);
private static String CONFIG_JSON_FILENAME = "config.json";
private static String CATALOG_JSON_FILENAME = "catalog.json";
private static String ERROR_LOG_FILENAME = "err.log";
private volatile Process workerProcess;
- public SingerDiscoveryWorker(String singerExecutablePath) {
- super(singerExecutablePath);
+ public SingerDiscoveryWorker(SingerConnector connector) {
+ super(connector);
}
@Override
OutputAndStatus runInternal(
- ConnectionImplementation connectionImplementation, String workspaceRoot) {
+ ConnectionImplementation connectionImplementation, Path workspaceRoot) {
// todo (cgardens) - just getting original impl to line up with new iface for now. this can be
// reduced.
final ObjectMapper objectMapper = new ObjectMapper();
@@ -79,23 +80,25 @@ OutputAndStatus runInternal(
// TODO use format converter here
// write config.json to disk
- String configPath = writeFileToWorkspace(workspaceRoot, CONFIG_JSON_FILENAME, configDotJson);
-
- String tapPath = getExecutableAbsolutePath();
-
- String catalogDotJsonPath =
- Path.of(workspaceRoot).resolve(CATALOG_JSON_FILENAME).toAbsolutePath().toString();
- String errorLogPath =
- Path.of(workspaceRoot).resolve(ERROR_LOG_FILENAME).toAbsolutePath().toString();
+ writeFile(workspaceRoot, CONFIG_JSON_FILENAME, configDotJson);
// exec
try {
- String[] cmd = {tapPath, "--config", configPath, "--discover"};
+ String[] cmd = {
+ "docker",
+ "run",
+ "-v",
+ String.format("%s:/singer/data", workspaceRoot.toString()),
+ connector.getImageName(),
+ "--config",
+ CONFIG_JSON_FILENAME,
+ "--discover"
+ };
workerProcess =
new ProcessBuilder(cmd)
- .redirectError(new File(errorLogPath))
- .redirectOutput(new File(catalogDotJsonPath))
+ .redirectError(getFullPath(workspaceRoot, ERROR_LOG_FILENAME).toFile())
+ .redirectOutput(getFullPath(workspaceRoot, CATALOG_JSON_FILENAME).toFile())
.start();
while (!workerProcess.waitFor(1, TimeUnit.MINUTES)) {
@@ -104,12 +107,12 @@ OutputAndStatus runInternal(
int exitCode = workerProcess.exitValue();
if (exitCode == 0) {
- String catalog = readFileFromWorkspace(workspaceRoot, CATALOG_JSON_FILENAME);
+ String catalog = readFile(workspaceRoot, CATALOG_JSON_FILENAME);
final SingerCatalog singerCatalog = jsonCatalogToTyped(catalog);
final StandardDiscoveryOutput discoveryOutput = toDiscoveryOutput(singerCatalog);
return new OutputAndStatus<>(SUCCESSFUL, discoveryOutput);
} else {
- String errLog = readFileFromWorkspace(workspaceRoot, ERROR_LOG_FILENAME);
+ String errLog = readFile(workspaceRoot, ERROR_LOG_FILENAME);
LOGGER.debug(
"Discovery job subprocess finished with exit code {}. Error log: {}", exitCode, errLog);
return new OutputAndStatus<>(FAILED);
diff --git a/dataline-workers/src/main/java/io/dataline/workers/singer/SingerTap.java b/dataline-workers/src/main/java/io/dataline/workers/singer/SingerTap.java
index 97cedab01caed..d5f2b83f671b7 100644
--- a/dataline-workers/src/main/java/io/dataline/workers/singer/SingerTap.java
+++ b/dataline-workers/src/main/java/io/dataline/workers/singer/SingerTap.java
@@ -26,23 +26,18 @@
public enum SingerTap implements SingerConnector {
// TODO
- S3_CSV("", ""),
- POSTGRES("tap-postgres", "tap-postgres"),
- STRIPE("", "");
+ S3_CSV(""),
+ EXCHANGERATEAPI_IO("dataline/integration-singer-exchangerateapi_io-source"),
+ POSTGRES("dataline/integration-singer-postgres-source"),
+ STRIPE("");
- private final String getPythonVirtualEnvName;
- private final String executableName;
+ private final String imageName;
- SingerTap(String getPythonVirtualEnvName, String executableName) {
- this.getPythonVirtualEnvName = getPythonVirtualEnvName;
- this.executableName = executableName;
+ SingerTap(String imageName) {
+ this.imageName = imageName;
}
- public String getPythonVirtualEnvName() {
- return getPythonVirtualEnvName;
- }
-
- public String getExecutableName() {
- return executableName;
+ public String getImageName() {
+ return imageName;
}
}
diff --git a/dataline-workers/src/main/java/io/dataline/workers/singer/SingerTarget.java b/dataline-workers/src/main/java/io/dataline/workers/singer/SingerTarget.java
index 7ccd5bf7ddb1e..63aeca25e41a4 100644
--- a/dataline-workers/src/main/java/io/dataline/workers/singer/SingerTarget.java
+++ b/dataline-workers/src/main/java/io/dataline/workers/singer/SingerTarget.java
@@ -26,25 +26,18 @@
public enum SingerTarget implements SingerConnector {
// TODO
- LOCAL_FILE("", ""),
- BIG_QUERY("", ""),
- POSTGRES("", "");
+ LOCAL_FILE("dataline/integration-singer-csv-destination"),
+ BIGQUERY(""),
+ POSTGRES("");
- private final String pythonVirtualEnvName;
- private final String executableName;
+ private final String imageName;
- SingerTarget(String pythonVirtualEnvName, String executableName) {
- this.pythonVirtualEnvName = pythonVirtualEnvName;
- this.executableName = executableName;
+ SingerTarget(String imageName) {
+ this.imageName = imageName;
}
@Override
- public String getPythonVirtualEnvName() {
- return pythonVirtualEnvName;
- }
-
- @Override
- public String getExecutableName() {
- return executableName;
+ public String getImageName() {
+ return imageName;
}
}
diff --git a/dataline-workers/src/main/java/io/dataline/workers/singer/postgres_tap/PostgresSingerTapConstants.java b/dataline-workers/src/main/java/io/dataline/workers/singer/postgres_tap/PostgresSingerTapConstants.java
deleted file mode 100644
index 556e42fbadb08..0000000000000
--- a/dataline-workers/src/main/java/io/dataline/workers/singer/postgres_tap/PostgresSingerTapConstants.java
+++ /dev/null
@@ -1,29 +0,0 @@
-/*
- * MIT License
- *
- * Copyright (c) 2020 Dataline
- *
- * Permission is hereby granted, free of charge, to any person obtaining a copy
- * of this software and associated documentation files (the "Software"), to deal
- * in the Software without restriction, including without limitation the rights
- * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
- * copies of the Software, and to permit persons to whom the Software is
- * furnished to do so, subject to the following conditions:
- *
- * The above copyright notice and this permission notice shall be included in all
- * copies or substantial portions of the Software.
- *
- * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
- * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
- * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
- * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
- * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
- * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
- * SOFTWARE.
- */
-
-package io.dataline.workers.singer.postgres_tap;
-
-public class PostgresSingerTapConstants {
- static String POSTGRES_SINGER_TAP = "/usr/local/lib/singer/tap-postgres/bin/tap-postgres";
-}
diff --git a/dataline-workers/src/main/java/io/dataline/workers/singer/postgres_tap/SingerPostgresTapCheckConnectionWorker.java b/dataline-workers/src/main/java/io/dataline/workers/singer/postgres_tap/SingerPostgresTapCheckConnectionWorker.java
index e22ef59df166a..f1213a761c81b 100644
--- a/dataline-workers/src/main/java/io/dataline/workers/singer/postgres_tap/SingerPostgresTapCheckConnectionWorker.java
+++ b/dataline-workers/src/main/java/io/dataline/workers/singer/postgres_tap/SingerPostgresTapCheckConnectionWorker.java
@@ -24,12 +24,12 @@
package io.dataline.workers.singer.postgres_tap;
-import static io.dataline.workers.singer.postgres_tap.PostgresSingerTapConstants.POSTGRES_SINGER_TAP;
-
import io.dataline.workers.singer.SingerCheckConnectionWorker;
+import io.dataline.workers.singer.SingerTap;
public class SingerPostgresTapCheckConnectionWorker extends SingerCheckConnectionWorker {
+
public SingerPostgresTapCheckConnectionWorker() {
- super(POSTGRES_SINGER_TAP);
+ super(SingerTap.POSTGRES);
}
}
diff --git a/dataline-workers/src/main/java/io/dataline/workers/singer/postgres_tap/SingerPostgresTapDiscoverWorker.java b/dataline-workers/src/main/java/io/dataline/workers/singer/postgres_tap/SingerPostgresTapDiscoverWorker.java
index 95057a8371600..43c5e7a706ff4 100644
--- a/dataline-workers/src/main/java/io/dataline/workers/singer/postgres_tap/SingerPostgresTapDiscoverWorker.java
+++ b/dataline-workers/src/main/java/io/dataline/workers/singer/postgres_tap/SingerPostgresTapDiscoverWorker.java
@@ -24,12 +24,12 @@
package io.dataline.workers.singer.postgres_tap;
-import static io.dataline.workers.singer.postgres_tap.PostgresSingerTapConstants.POSTGRES_SINGER_TAP;
-
import io.dataline.workers.singer.SingerDiscoveryWorker;
+import io.dataline.workers.singer.SingerTap;
public class SingerPostgresTapDiscoverWorker extends SingerDiscoveryWorker {
+
public SingerPostgresTapDiscoverWorker() {
- super(POSTGRES_SINGER_TAP);
+ super(SingerTap.POSTGRES);
}
}
diff --git a/dataline-workers/src/test/java/io/dataline/workers/PostgreSQLContainerHelper.java b/dataline-workers/src/test/java/io/dataline/workers/PostgreSQLContainerHelper.java
index f4a98489734ac..2415b572f964f 100644
--- a/dataline-workers/src/test/java/io/dataline/workers/PostgreSQLContainerHelper.java
+++ b/dataline-workers/src/test/java/io/dataline/workers/PostgreSQLContainerHelper.java
@@ -36,7 +36,7 @@ public static String getSingerConfigJson(PostgreSQLContainer db) throws JsonProc
return getSingerConfigJson(
db.getUsername(),
db.getPassword(),
- db.getHost(),
+ "host.docker.internal",
db.getDatabaseName(),
String.valueOf(db.getFirstMappedPort()));
}
diff --git a/dataline-workers/src/test/java/io/dataline/workers/singer/SingerCheckConnectionWorkerTest.java b/dataline-workers/src/test/java/io/dataline/workers/singer/SingerCheckConnectionWorkerTest.java
index 1cfe2da2192ac..608d7db89cbd2 100644
--- a/dataline-workers/src/test/java/io/dataline/workers/singer/SingerCheckConnectionWorkerTest.java
+++ b/dataline-workers/src/test/java/io/dataline/workers/singer/SingerCheckConnectionWorkerTest.java
@@ -68,9 +68,10 @@ public void testNonexistentDb()
final Object o = new ObjectMapper().readValue(fakeDbCreds, Object.class);
connectionImplementation.setConfiguration(o);
- SingerCheckConnectionWorker worker = new SingerCheckConnectionWorker(SINGER_POSTGRES_TAP_PATH);
+ SingerCheckConnectionWorker worker = new SingerCheckConnectionWorker(SingerTap.POSTGRES);
OutputAndStatus run =
- worker.run(connectionImplementation, createWorkspacePath(jobId).toString());
+ worker.run(connectionImplementation, createWorkspacePath(jobId));
+
assertEquals(FAILED, run.getStatus());
assertTrue(run.getOutput().isPresent());
assertEquals(StandardConnectionStatus.Status.FAILURE, run.getOutput().get().getStatus());
@@ -91,14 +92,15 @@ public void testIncorrectAuthCredentials()
db.getDatabaseName(),
db.getFirstMappedPort() + "");
- SingerCheckConnectionWorker worker = new SingerCheckConnectionWorker(SINGER_POSTGRES_TAP_PATH);
+ SingerCheckConnectionWorker worker = new SingerCheckConnectionWorker(SingerTap.POSTGRES);
final ConnectionImplementation connectionImplementation = new ConnectionImplementation();
final Object o = new ObjectMapper().readValue(incorrectCreds, Object.class);
connectionImplementation.setConfiguration(o);
OutputAndStatus run =
- worker.run(connectionImplementation, createWorkspacePath(jobId).toString());
+ worker.run(connectionImplementation, createWorkspacePath(jobId));
+
assertEquals(FAILED, run.getStatus());
assertTrue(run.getOutput().isPresent());
assertEquals(StandardConnectionStatus.Status.FAILURE, run.getOutput().get().getStatus());
@@ -117,9 +119,10 @@ public void testSuccessfulConnection()
final Object o = new ObjectMapper().readValue(creds, Object.class);
connectionImplementation.setConfiguration(o);
- SingerCheckConnectionWorker worker = new SingerCheckConnectionWorker(SINGER_POSTGRES_TAP_PATH);
+ SingerCheckConnectionWorker worker = new SingerCheckConnectionWorker(SingerTap.POSTGRES);
OutputAndStatus run =
- worker.run(connectionImplementation, createWorkspacePath(jobId).toString());
+ worker.run(connectionImplementation, createWorkspacePath(jobId));
+
assertEquals(SUCCESSFUL, run.getStatus());
assertTrue(run.getOutput().isPresent());
assertEquals(StandardConnectionStatus.Status.SUCCESS, run.getOutput().get().getStatus());
diff --git a/dataline-workers/src/test/java/io/dataline/workers/singer/SingerDiscoveryWorkerTest.java b/dataline-workers/src/test/java/io/dataline/workers/singer/SingerDiscoveryWorkerTest.java
index b0061d7543122..13160eb2fec9b 100644
--- a/dataline-workers/src/test/java/io/dataline/workers/singer/SingerDiscoveryWorkerTest.java
+++ b/dataline-workers/src/test/java/io/dataline/workers/singer/SingerDiscoveryWorkerTest.java
@@ -73,10 +73,11 @@ public void testPostgresDiscovery() throws IOException {
final Object o = new ObjectMapper().readValue(postgresCreds, Object.class);
connectionImplementation.setConfiguration(o);
- SingerDiscoveryWorker worker = new SingerDiscoveryWorker(SINGER_POSTGRES_TAP_PATH);
+ SingerDiscoveryWorker worker = new SingerDiscoveryWorker(SingerTap.POSTGRES);
OutputAndStatus run =
- worker.run(connectionImplementation, createWorkspacePath(jobId).toString());
+ worker.run(connectionImplementation, createWorkspacePath(jobId));
+
assertEquals(SUCCESSFUL, run.getStatus());
String expectedSchema = readResource("simple_postgres_schema.json");
@@ -91,21 +92,23 @@ public void testPostgresDiscovery() throws IOException {
public void testCancellation() throws IOException, InterruptedException, ExecutionException {
final String jobId = "1";
String postgresCreds = PostgreSQLContainerHelper.getSingerConfigJson(db);
+
final ConnectionImplementation connectionImplementation = new ConnectionImplementation();
final Object o = new ObjectMapper().readValue(postgresCreds, Object.class);
connectionImplementation.setConfiguration(o);
- SingerDiscoveryWorker worker = new SingerDiscoveryWorker(SINGER_POSTGRES_TAP_PATH);
+ SingerDiscoveryWorker worker = new SingerDiscoveryWorker(SingerTap.POSTGRES);
+
ExecutorService threadPool = Executors.newFixedThreadPool(2);
Future> workerWasCancelled =
threadPool.submit(
() -> {
OutputAndStatus output =
- worker.run(connectionImplementation, createWorkspacePath(jobId).toString());
+ worker.run(connectionImplementation, createWorkspacePath(jobId));
assertEquals(FAILED, output.getStatus());
});
- TimeUnit.MILLISECONDS.sleep(100);
+ TimeUnit.MILLISECONDS.sleep(50);
worker.cancel();
workerWasCancelled.get();
}
diff --git a/settings.gradle b/settings.gradle
index edbf9aacd3993..733ccb125f4cf 100644
--- a/settings.gradle
+++ b/settings.gradle
@@ -9,3 +9,4 @@ include 'dataline-db'
include 'dataline-scheduler'
include 'dataline-server'
include 'dataline-workers'
+
diff --git a/tools/integrations/manage.sh b/tools/integrations/manage.sh
new file mode 100755
index 0000000000000..d7de8ff600ba7
--- /dev/null
+++ b/tools/integrations/manage.sh
@@ -0,0 +1,52 @@
+#!/usr/bin/env sh
+
+set -e
+
+. tools/lib/lib.sh
+
+DOCKER_ORG=${DOCKER_ORG:-dataline}
+
+get_name() {
+ local name=$(dirname $1 | sed "s/^dataline-integrations/integration/" | tr / -)
+ echo "${DOCKER_ORG}/$name"
+}
+
+cmd_build() {
+ local path=$1
+
+ echo "Building $path"
+ docker build -f "$path" -t "$(get_name $path)" "$(dirname "$path")" | grep "Successfully tagged"
+}
+
+cmd_push() {
+ local name=$(get_name "$1")
+
+ echo "Pushing $name"
+ docker push "$name"
+}
+
+cmd_publish() {
+ local path=$1
+
+ cmd_build "$path"
+ cmd_push "$path"
+}
+
+main() {
+ assert_root
+
+ local cmd=$1
+ shift || error "Missing cmd"
+ local path=$1
+ shift || error "Missing target (root path of integration or 'all')"
+
+ if [[ $path == "all" ]]; then
+ for path in $(find dataline-integrations -iname "Dockerfile" -type f); do
+ cmd_$cmd $path
+ done
+ else
+ cmd_$cmd $path
+ fi
+}
+
+main "$@"