Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use docker for integrations #87

Merged
merged 31 commits into from
Aug 21, 2020
Merged
Show file tree
Hide file tree
Changes from 23 commits
Commits
Show all changes
31 commits
Select commit Hold shift + click to select a range
de39abb
progress
michel-tricot Aug 17, 2020
ebb8fff
progress
michel-tricot Aug 18, 2020
609708a
Merge branch 'master' into mt-isolate-singer
michel-tricot Aug 18, 2020
c531124
progress
michel-tricot Aug 18, 2020
23f02b9
cleanup
michel-tricot Aug 18, 2020
a05dd54
rename file
michel-tricot Aug 18, 2020
a14a8d9
add destination
michel-tricot Aug 18, 2020
845eb4e
Merge branch 'master' into mt-isolate-singer
michel-tricot Aug 18, 2020
8428f49
progress
michel-tricot Aug 18, 2020
5acc06f
isolate workdir
michel-tricot Aug 18, 2020
b639ad5
Build & push
michel-tricot Aug 18, 2020
dd57a59
Merge branch 'master' into mt-isolate-singer
michel-tricot Aug 18, 2020
4346bc3
progress
michel-tricot Aug 19, 2020
4a163c8
Merge branch 'master' into mt-isolate-singer
michel-tricot Aug 19, 2020
ea715a9
working stdin
michel-tricot Aug 19, 2020
a68006c
progress
michel-tricot Aug 20, 2020
a08e540
still having issues
michel-tricot Aug 20, 2020
591e362
still not working
michel-tricot Aug 20, 2020
6105c10
different approach
michel-tricot Aug 20, 2020
0fceca3
Merge branch 'master' into mt-isolate-singer
michel-tricot Aug 20, 2020
a739968
migrate to docker
michel-tricot Aug 20, 2020
368c631
fix host
michel-tricot Aug 20, 2020
4df0733
fix build
michel-tricot Aug 20, 2020
163f47a
fix conflicts
michel-tricot Aug 21, 2020
9250787
install docker cli
michel-tricot Aug 21, 2020
e0b4397
install docker cli
michel-tricot Aug 21, 2020
07893fc
iterate
michel-tricot Aug 21, 2020
18e7590
iterate
michel-tricot Aug 21, 2020
f7bf7a2
attempt to fix
michel-tricot Aug 21, 2020
2bd5013
Merge branch 'master' into mt-isolate-singer
michel-tricot Aug 21, 2020
692d9b1
remove test
michel-tricot Aug 21, 2020
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
46 changes: 25 additions & 21 deletions dataline-commons/src/main/resources/log4j2.xml
Original file line number Diff line number Diff line change
@@ -1,29 +1,33 @@
<?xml version="1.0" encoding="UTF-8"?>
<Configuration status="INFO">

<Appenders>
<Console name="Console" target="SYSTEM_OUT">
<PatternLayout pattern="%d{yyyy-MM-dd HH:mm:ss} %-5p %c{1}:%L - %m%n"/>
</Console>
</Appenders>
<Appenders>
<Console name="Console" target="SYSTEM_OUT">
<PatternLayout pattern="%d{yyyy-MM-dd HH:mm:ss} %-5p %c{1}:%L - %m%n"/>
</Console>
</Appenders>

<Loggers>
<Root level="TRACE">
<AppenderRef ref="Console"/>
</Root>
<Loggers>
<Root level="TRACE">
<AppenderRef ref="Console"/>
</Root>

<Logger name="org.jooq.tools.LoggerListener" level="INFO">
<AppenderRef ref="Console"/>
</Logger>
<Logger name="org.eclipse.jetty" level="INFO" additivity="false">
<AppenderRef ref="Console"/>
</Logger>
<Logger name="com.github.dockerjava" level="INFO" additivity="false">
<AppenderRef ref="Console"/>
</Logger>
<Logger name="org.apache.hc" level="INFO" additivity="false">
<AppenderRef ref="Console"/>
</Logger>
<Logger name="org.jooq.tools.LoggerListener" level="INFO">
<AppenderRef ref="Console"/>
</Logger>
<Logger name="org.jooq" level="INFO">
<AppenderRef ref="Console"/>
</Logger>

<Logger name="org.jooq" level="INFO">
<AppenderRef ref="Console"/>
</Logger>

<Logger name="org.eclipse.jetty" level="INFO" additivity="false">
<AppenderRef ref="Console"/>
</Logger>

</Loggers>
</Loggers>

</Configuration>
16 changes: 16 additions & 0 deletions dataline-integrations/singer/csv/destination/Dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
FROM python:3.7-slim

WORKDIR /singer

ENV VIRTUAL_ENV=/singer/env
RUN python -m venv $VIRTUAL_ENV
ENV PATH="$VIRTUAL_ENV/bin:$PATH"

# Install dependencies:
COPY requirements.txt .
RUN python -m pip install --upgrade pip && \
pip install -r requirements.txt

WORKDIR /singer/data

ENTRYPOINT ["target-csv"]
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
target-csv == 0.3.0
16 changes: 16 additions & 0 deletions dataline-integrations/singer/exchangerateapi_io/source/Dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
FROM python:3.7-slim

WORKDIR /singer

ENV VIRTUAL_ENV=/singer/env
RUN python -m venv $VIRTUAL_ENV
ENV PATH="$VIRTUAL_ENV/bin:$PATH"

# Install dependencies:
COPY requirements.txt .
RUN python -m pip install --upgrade pip && \
pip install -r requirements.txt

WORKDIR /singer/data

ENTRYPOINT ["tap-exchangeratesapi"]
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
tap-exchangeratesapi == 0.1.1
16 changes: 16 additions & 0 deletions dataline-integrations/singer/postgres/destination/Dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
FROM python:3.7-slim

WORKDIR /singer

ENV VIRTUAL_ENV=/singer/env
RUN python -m venv $VIRTUAL_ENV
ENV PATH="$VIRTUAL_ENV/bin:$PATH"

# Install dependencies:
COPY requirements.txt .
RUN python -m pip install --upgrade pip && \
pip install -r requirements.txt

WORKDIR /singer/data

ENTRYPOINT ["target-postgres"]
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
psycopg2-binary == 2.8.5
target-postgres == 1.1.3
22 changes: 22 additions & 0 deletions dataline-integrations/singer/postgres/source/Dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
FROM python:3.7-slim

WORKDIR /singer

ENV VIRTUAL_ENV=/singer/env
RUN python -m venv $VIRTUAL_ENV
ENV PATH="$VIRTUAL_ENV/bin:$PATH"

# need gcc to compile psycopg2
RUN apt-get update && \
apt-get install -y libpq-dev gcc

# Install dependencies:
COPY requirements.txt .
RUN python -m pip install --upgrade pip && \
pip install -r requirements.txt

RUN apt-get autoremove -y gcc

WORKDIR /singer/data

ENTRYPOINT ["tap-postgres"]
2 changes: 2 additions & 0 deletions dataline-integrations/singer/postgres/source/requirements.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
psycopg2 == 2.7.4
tap-postgres == 0.1.0
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import io.dataline.workers.singer.SingerDiscoveryWorker;
import io.dataline.workers.singer.SingerTap;
import java.io.IOException;
import java.nio.file.Paths;
import java.sql.SQLException;
import java.time.Instant;
import java.time.LocalDateTime;
Expand All @@ -48,6 +49,7 @@
import org.slf4j.LoggerFactory;

public class JobSubmitter implements Runnable {

private static final Logger LOGGER = LoggerFactory.getLogger(JobSubmitter.class);

private final ExecutorService threadPool;
Expand Down Expand Up @@ -176,11 +178,12 @@ private void handleJob(Job job) {
job.getId(),
new SingerDiscoveryWorker(
"worker-1", // todo: assign worker ids
configString,
Paths.get("/tmp/workspace1"),
tap,
"/usr/local/lib/singer/workspace1", // todo: better path and why are we
configString
// todo: better path and why are we
// scoping by workspace here
"/usr/local/lib/singer/"),
),
connectionPool));
LOGGER.info("Submitting job to thread pool...");
break;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,8 @@ public SourceImplementationDiscoverSchemaRead discoverSchemaForSourceImplementat
LOGGER.info("jobId = " + jobId);
final Job job = waitUntilJobIsTerminalOrTimeout(jobId);

final StandardDiscoveryOutput discoveryOutput = job.getOutput()
final StandardDiscoveryOutput discoveryOutput =
job.getOutput()
.orElseThrow(() -> new RuntimeException("Terminal job does not have an output"))
.getDiscoverSchema();

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
/*
* MIT License
*
* Copyright (c) 2020 Dataline
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
* in the Software without restriction, including without limitation the rights
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
* copies of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in all
* copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
* SOFTWARE.
*/

package io.dataline.workers;

import com.google.common.collect.Lists;
import java.io.IOException;
import java.lang.ProcessBuilder.Redirect;
import java.util.List;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class DockerProcessRunner {

private static final Logger LOGGER = LoggerFactory.getLogger(DockerProcessRunner.class);

public static void main(String[] args) throws IOException, InterruptedException {
ProcessBuilder processBuilderTap =
new ProcessBuilder("docker", "run", "dataline/integration-singer-exchangerateapi_io-source")
.redirectError(Redirect.INHERIT);

ProcessBuilder processBuilderTarget =
new ProcessBuilder(
"docker",
"run",
"-i",
"-v",
"/tmp/singer:/singer/data",
"dataline/integration-singer-csv-destination")
.redirectError(Redirect.INHERIT)
.redirectOutput(Redirect.INHERIT);

List<Process> processes =
ProcessBuilder.startPipeline(Lists.newArrayList(processBuilderTap, processBuilderTarget));

for (Process process : processes) {
process.waitFor();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,31 +27,26 @@
import io.dataline.workers.JobStatus;
import io.dataline.workers.OutputAndStatus;
import io.dataline.workers.Worker;
import java.io.BufferedReader;
import java.io.FileReader;
import java.io.FileWriter;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import org.apache.commons.io.FileUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public abstract class BaseSingerWorker<OutputType> implements Worker<OutputType> {
private static Logger LOGGER = LoggerFactory.getLogger(BaseSingerWorker.class);

protected JobStatus jobStatus;
protected String workerId;
private static final Logger LOGGER = LoggerFactory.getLogger(BaseSingerWorker.class);

protected final String workerId;
protected final Path workspacePath;

private final String singerRoot;
private JobStatus jobStatus;

protected BaseSingerWorker(String workerId, String workspaceRoot, String singerRoot) {
protected BaseSingerWorker(String workerId, Path workspaceRoot) {
this.workerId = workerId;
this.workspacePath = Path.of(workspaceRoot, workerId);
this.singerRoot = singerRoot;
this.workspacePath = workspaceRoot.resolve(workerId);
}

@Override
Expand All @@ -73,7 +68,7 @@ private void createWorkspace() {

protected void cancelHelper(Process workerProcess) {
try {
jobStatus = JobStatus.FAILED;
updateJobStatus(JobStatus.FAILED);
workerProcess.destroy();
workerProcess.waitFor(10, TimeUnit.SECONDS);
if (workerProcess.isAlive()) {
Expand All @@ -88,36 +83,30 @@ protected Path getWorkspacePath() {
return workspacePath;
}

protected String readFileFromWorkspace(String fileName) {
try (FileReader fileReader = new FileReader(getWorkspaceFilePath(fileName));
BufferedReader br = new BufferedReader(fileReader)) {
return br.lines().collect(Collectors.joining("\n"));
protected String readFile(String fileName) {
try {
Path filePath = getFullPath(fileName);
return FileUtils.readFileToString(filePath.toFile(), StandardCharsets.UTF_8);
} catch (IOException e) {
throw new RuntimeException(e);
}
}

protected String writeFileToWorkspace(String fileName, String contents) {
String filePath = getWorkspaceFilePath(fileName);
try (FileWriter fileWriter = new FileWriter(filePath)) {
fileWriter.write(contents);
protected Path writeFile(String fileName, String contents) {
try {
Path filePath = getFullPath(fileName);
FileUtils.writeStringToFile(filePath.toFile(), contents, StandardCharsets.UTF_8);
return filePath;
} catch (IOException e) {
throw new RuntimeException(e);
}
}

protected String getExecutableAbsolutePath(SingerConnector tapOrTarget) {
return Paths.get(
singerRoot,
tapOrTarget.getPythonVirtualEnvName(),
"bin",
tapOrTarget.getExecutableName())
.toAbsolutePath()
.toString();
protected Path getFullPath(String fileName) {
return getWorkspacePath().resolve(fileName).toAbsolutePath();
}

private String getWorkspaceFilePath(String fileName) {
return getWorkspacePath().resolve(fileName).toAbsolutePath().toString();
protected void updateJobStatus(JobStatus jobStatus) {
this.jobStatus = jobStatus;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,25 +29,22 @@
import io.dataline.workers.CheckConnectionWorker;
import io.dataline.workers.JobStatus;
import io.dataline.workers.OutputAndStatus;
import java.nio.file.Path;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class SingerCheckConnectionWorker extends BaseSingerWorker<StandardConnectionStatus>
implements CheckConnectionWorker {

private static final Logger LOGGER = LoggerFactory.getLogger(SingerCheckConnectionWorker.class);

private final SingerDiscoveryWorker singerDiscoveryWorker;

public SingerCheckConnectionWorker(
String workerId,
SingerTap singerTap,
String configDotJson,
String workspaceRoot,
String singerLibsRoot) {
super(workerId, workspaceRoot, singerLibsRoot);
String workerId, Path workspaceRoot, SingerTap singerTap, String configContent) {
super(workerId, workspaceRoot);
this.singerDiscoveryWorker =
new SingerDiscoveryWorker(
workerId, configDotJson, singerTap, workspaceRoot, singerLibsRoot);
new SingerDiscoveryWorker(workerId, workspaceRoot, singerTap, configContent);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,5 @@
package io.dataline.workers.singer;

interface SingerConnector {
String getPythonVirtualEnvName();

String getExecutableName();
String getImageName();
}
Loading