Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use docker for integrations #87

Merged
merged 31 commits into from
Aug 21, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
31 commits
Select commit Hold shift + click to select a range
de39abb
progress
michel-tricot Aug 17, 2020
ebb8fff
progress
michel-tricot Aug 18, 2020
609708a
Merge branch 'master' into mt-isolate-singer
michel-tricot Aug 18, 2020
c531124
progress
michel-tricot Aug 18, 2020
23f02b9
cleanup
michel-tricot Aug 18, 2020
a05dd54
rename file
michel-tricot Aug 18, 2020
a14a8d9
add destination
michel-tricot Aug 18, 2020
845eb4e
Merge branch 'master' into mt-isolate-singer
michel-tricot Aug 18, 2020
8428f49
progress
michel-tricot Aug 18, 2020
5acc06f
isolate workdir
michel-tricot Aug 18, 2020
b639ad5
Build & push
michel-tricot Aug 18, 2020
dd57a59
Merge branch 'master' into mt-isolate-singer
michel-tricot Aug 18, 2020
4346bc3
progress
michel-tricot Aug 19, 2020
4a163c8
Merge branch 'master' into mt-isolate-singer
michel-tricot Aug 19, 2020
ea715a9
working stdin
michel-tricot Aug 19, 2020
a68006c
progress
michel-tricot Aug 20, 2020
a08e540
still having issues
michel-tricot Aug 20, 2020
591e362
still not working
michel-tricot Aug 20, 2020
6105c10
different approach
michel-tricot Aug 20, 2020
0fceca3
Merge branch 'master' into mt-isolate-singer
michel-tricot Aug 20, 2020
a739968
migrate to docker
michel-tricot Aug 20, 2020
368c631
fix host
michel-tricot Aug 20, 2020
4df0733
fix build
michel-tricot Aug 20, 2020
163f47a
fix conflicts
michel-tricot Aug 21, 2020
9250787
install docker cli
michel-tricot Aug 21, 2020
e0b4397
install docker cli
michel-tricot Aug 21, 2020
07893fc
iterate
michel-tricot Aug 21, 2020
18e7590
iterate
michel-tricot Aug 21, 2020
f7bf7a2
attempt to fix
michel-tricot Aug 21, 2020
2bd5013
Merge branch 'master' into mt-isolate-singer
michel-tricot Aug 21, 2020
692d9b1
remove test
michel-tricot Aug 21, 2020
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 0 additions & 2 deletions .github/workflows/gradle.yml
Original file line number Diff line number Diff line change
Expand Up @@ -15,5 +15,3 @@ jobs:
- name: Test
run: ./tools/app/test.sh



46 changes: 25 additions & 21 deletions dataline-commons/src/main/resources/log4j2.xml
Original file line number Diff line number Diff line change
@@ -1,29 +1,33 @@
<?xml version="1.0" encoding="UTF-8"?>
<Configuration status="INFO">

<Appenders>
<Console name="Console" target="SYSTEM_OUT">
<PatternLayout pattern="%d{yyyy-MM-dd HH:mm:ss} %-5p %c{1}:%L - %m%n"/>
</Console>
</Appenders>
<Appenders>
<Console name="Console" target="SYSTEM_OUT">
<PatternLayout pattern="%d{yyyy-MM-dd HH:mm:ss} %-5p %c{1}:%L - %m%n"/>
</Console>
</Appenders>

<Loggers>
<Root level="TRACE">
<AppenderRef ref="Console"/>
</Root>
<Loggers>
<Root level="TRACE">
<AppenderRef ref="Console"/>
</Root>

<Logger name="org.jooq.tools.LoggerListener" level="INFO">
<AppenderRef ref="Console"/>
</Logger>
<Logger name="org.eclipse.jetty" level="INFO" additivity="false">
<AppenderRef ref="Console"/>
</Logger>
<Logger name="com.github.dockerjava" level="INFO" additivity="false">
<AppenderRef ref="Console"/>
</Logger>
<Logger name="org.apache.hc" level="INFO" additivity="false">
<AppenderRef ref="Console"/>
</Logger>
<Logger name="org.jooq.tools.LoggerListener" level="INFO">
<AppenderRef ref="Console"/>
</Logger>
<Logger name="org.jooq" level="INFO">
<AppenderRef ref="Console"/>
</Logger>

<Logger name="org.jooq" level="INFO">
<AppenderRef ref="Console"/>
</Logger>

<Logger name="org.eclipse.jetty" level="INFO" additivity="false">
<AppenderRef ref="Console"/>
</Logger>

</Loggers>
</Loggers>

</Configuration>
16 changes: 16 additions & 0 deletions dataline-integrations/singer/csv/destination/Dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
FROM python:3.7-slim

WORKDIR /singer

ENV VIRTUAL_ENV=/singer/env
RUN python -m venv $VIRTUAL_ENV
ENV PATH="$VIRTUAL_ENV/bin:$PATH"

# Install dependencies:
COPY requirements.txt .
RUN python -m pip install --upgrade pip && \
pip install -r requirements.txt

WORKDIR /singer/data

ENTRYPOINT ["target-csv"]
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
target-csv == 0.3.0
16 changes: 16 additions & 0 deletions dataline-integrations/singer/exchangerateapi_io/source/Dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
FROM python:3.7-slim

WORKDIR /singer

ENV VIRTUAL_ENV=/singer/env
RUN python -m venv $VIRTUAL_ENV
ENV PATH="$VIRTUAL_ENV/bin:$PATH"

# Install dependencies:
COPY requirements.txt .
RUN python -m pip install --upgrade pip && \
pip install -r requirements.txt

WORKDIR /singer/data

ENTRYPOINT ["tap-exchangeratesapi"]
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
tap-exchangeratesapi == 0.1.1
16 changes: 16 additions & 0 deletions dataline-integrations/singer/postgres/destination/Dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
FROM python:3.7-slim

WORKDIR /singer

ENV VIRTUAL_ENV=/singer/env
RUN python -m venv $VIRTUAL_ENV
ENV PATH="$VIRTUAL_ENV/bin:$PATH"

# Install dependencies:
COPY requirements.txt .
RUN python -m pip install --upgrade pip && \
pip install -r requirements.txt

WORKDIR /singer/data

ENTRYPOINT ["target-postgres"]
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
psycopg2-binary == 2.8.5
target-postgres == 1.1.3
22 changes: 22 additions & 0 deletions dataline-integrations/singer/postgres/source/Dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
FROM python:3.7-slim

WORKDIR /singer

ENV VIRTUAL_ENV=/singer/env
RUN python -m venv $VIRTUAL_ENV
ENV PATH="$VIRTUAL_ENV/bin:$PATH"

# need gcc to compile psycopg2
RUN apt-get update && \
apt-get install -y libpq-dev gcc

# Install dependencies:
COPY requirements.txt .
RUN python -m pip install --upgrade pip && \
pip install -r requirements.txt

RUN apt-get autoremove -y gcc

WORKDIR /singer/data

ENTRYPOINT ["tap-postgres"]
2 changes: 2 additions & 0 deletions dataline-integrations/singer/postgres/source/requirements.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
psycopg2 == 2.7.4
tap-postgres == 0.1.0
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@
import org.slf4j.LoggerFactory;

public class JobSubmitter implements Runnable {

private static final Logger LOGGER = LoggerFactory.getLogger(JobSubmitter.class);

private final ExecutorService threadPool;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
import org.slf4j.LoggerFactory;

public class WorkerWrapper<InputType, OutputType> implements Runnable {

private static final Logger LOGGER = LoggerFactory.getLogger(WorkerWrapper.class);

private final long jobId;
Expand Down Expand Up @@ -106,7 +107,7 @@ public void run() {
FileUtils.forceMkdir(workspacesRoot.toFile());
final Path workspaceRoot = workspacesRoot.resolve(String.valueOf(jobId));
FileUtils.forceMkdir(workspaceRoot.toFile());
OutputAndStatus<OutputType> outputAndStatus = worker.run(input, workspaceRoot.toString());
OutputAndStatus<OutputType> outputAndStatus = worker.run(input, workspaceRoot);

switch (outputAndStatus.getStatus()) {
case FAILED:
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
/*
* MIT License
*
* Copyright (c) 2020 Dataline
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
* in the Software without restriction, including without limitation the rights
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
* copies of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in all
* copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
* SOFTWARE.
*/

package io.dataline.workers;

import com.google.common.collect.Lists;
import java.io.IOException;
import java.lang.ProcessBuilder.Redirect;
import java.util.List;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class DockerProcessRunner {

private static final Logger LOGGER = LoggerFactory.getLogger(DockerProcessRunner.class);

public static void main(String[] args) throws IOException, InterruptedException {
ProcessBuilder processBuilderTap =
new ProcessBuilder("docker", "run", "dataline/integration-singer-exchangerateapi_io-source")
.redirectError(Redirect.INHERIT);

ProcessBuilder processBuilderTarget =
new ProcessBuilder(
"docker",
"run",
"-i",
"-v",
"/tmp/singer:/singer/data",
"dataline/integration-singer-csv-destination")
.redirectError(Redirect.INHERIT)
.redirectOutput(Redirect.INHERIT);

List<Process> processes =
ProcessBuilder.startPipeline(Lists.newArrayList(processBuilderTap, processBuilderTarget));

for (Process process : processes) {
process.waitFor();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@

package io.dataline.workers;

import java.nio.file.Path;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -33,7 +34,7 @@ public class EchoWorker implements Worker<String, String> {
public EchoWorker() {}

@Override
public OutputAndStatus<String> run(String string, String workspaceRoot) {
public OutputAndStatus<String> run(String string, Path workspaceRoot) {
LOGGER.info("Hello World. input: {}, workspace root: {}", string, workspaceRoot);
return new OutputAndStatus<>(JobStatus.SUCCESSFUL, "echoed");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,12 +24,14 @@

package io.dataline.workers;

import java.nio.file.Path;

public interface Worker<InputType, OutputType> {
/**
* Blocking call to run the worker's workflow. Once this is complete, getStatus should return
* either COMPLETE, FAILED, or CANCELLED.
*/
OutputAndStatus<OutputType> run(InputType inputType, String workspacePath)
OutputAndStatus<OutputType> run(InputType inputType, Path workspacePath)
throws InvalidCredentialsException, InvalidCatalogException;

void cancel();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,38 +27,37 @@
import io.dataline.workers.JobStatus;
import io.dataline.workers.OutputAndStatus;
import io.dataline.workers.Worker;
import java.io.BufferedReader;
import java.io.FileReader;
import java.io.FileWriter;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.nio.file.Path;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import org.apache.commons.io.FileUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public abstract class BaseSingerWorker<InputType, OutputType>
implements Worker<InputType, OutputType> {

private static final Logger LOGGER = LoggerFactory.getLogger(BaseSingerWorker.class);

protected JobStatus jobStatus;
protected final SingerConnector connector;

private final String singerExecutablePath;
private JobStatus jobStatus;

protected BaseSingerWorker(String singerExecutablePath) {
this.singerExecutablePath = singerExecutablePath;
protected BaseSingerWorker(SingerConnector connector) {
this.connector = connector;
}

@Override
public OutputAndStatus<OutputType> run(InputType inputType, String workspaceRoot) {
public OutputAndStatus<OutputType> run(InputType inputType, Path workspaceRoot) {
return runInternal(inputType, workspaceRoot);
}

abstract OutputAndStatus<OutputType> runInternal(InputType inputType, String workspaceRoot);
abstract OutputAndStatus<OutputType> runInternal(InputType inputType, Path workspaceRoot);

protected void cancelHelper(Process workerProcess) {
try {
jobStatus = JobStatus.FAILED;
updateJobStatus(JobStatus.FAILED);
workerProcess.destroy();
workerProcess.waitFor(10, TimeUnit.SECONDS);
if (workerProcess.isAlive()) {
Expand All @@ -69,30 +68,30 @@ protected void cancelHelper(Process workerProcess) {
}
}

protected String readFileFromWorkspace(String workspaceRoot, String fileName) {
try (FileReader fileReader = new FileReader(getWorkspaceFilePath(workspaceRoot, fileName));
BufferedReader br = new BufferedReader(fileReader)) {
return br.lines().collect(Collectors.joining("\n"));
protected static String readFile(Path workspaceRoot, String fileName) {
try {
Path filePath = workspaceRoot.resolve(fileName);
return FileUtils.readFileToString(filePath.toFile(), StandardCharsets.UTF_8);
} catch (IOException e) {
throw new RuntimeException(e);
}
}

protected String writeFileToWorkspace(String workspaceRoot, String fileName, String contents) {
String filePath = getWorkspaceFilePath(workspaceRoot, fileName);
try (FileWriter fileWriter = new FileWriter(filePath)) {
fileWriter.write(contents);
protected static Path writeFile(Path workspaceRoot, String fileName, String contents) {
try {
Path filePath = workspaceRoot.resolve(fileName);
FileUtils.writeStringToFile(filePath.toFile(), contents, StandardCharsets.UTF_8);
return filePath;
} catch (IOException e) {
throw new RuntimeException(e);
}
}

protected String getExecutableAbsolutePath() {
return singerExecutablePath;
protected static Path getFullPath(Path workspaceRoot, String fileName) {
return workspaceRoot.resolve(fileName);
}

private String getWorkspaceFilePath(String workspaceRoot, String fileName) {
return Path.of(workspaceRoot).resolve(fileName).toAbsolutePath().toString();
protected void updateJobStatus(JobStatus jobStatus) {
this.jobStatus = jobStatus;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,24 +30,26 @@
import io.dataline.workers.CheckConnectionWorker;
import io.dataline.workers.JobStatus;
import io.dataline.workers.OutputAndStatus;
import java.nio.file.Path;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class SingerCheckConnectionWorker
extends BaseSingerWorker<ConnectionImplementation, StandardConnectionStatus>
implements CheckConnectionWorker {

private static final Logger LOGGER = LoggerFactory.getLogger(SingerCheckConnectionWorker.class);

private final SingerDiscoveryWorker singerDiscoveryWorker;

public SingerCheckConnectionWorker(String singerExecutablePath) {
super(singerExecutablePath);
this.singerDiscoveryWorker = new SingerDiscoveryWorker(singerExecutablePath);
public SingerCheckConnectionWorker(SingerConnector connector) {
super(connector);
this.singerDiscoveryWorker = new SingerDiscoveryWorker(connector);
}

@Override
OutputAndStatus<StandardConnectionStatus> runInternal(
ConnectionImplementation connectionImplementation, String workspaceRoot) {
ConnectionImplementation connectionImplementation, Path workspaceRoot) {
OutputAndStatus<StandardDiscoveryOutput> outputAndStatus =
singerDiscoveryWorker.runInternal(connectionImplementation, workspaceRoot);
StandardConnectionStatus connectionStatus = new StandardConnectionStatus();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,5 @@
package io.dataline.workers.singer;

interface SingerConnector {
String getPythonVirtualEnvName();

String getExecutableName();
String getImageName();
}
Loading