Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

WIP named pipe / socat sidecar kube port forwarding #3518

Merged
merged 3 commits into from
May 21, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -29,3 +29,4 @@ Todo:
- Use an init container to create the initial named pipe
- Use a shared mount
- Test message sizes / etc
- Error code propagation
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
FROM debian:latest
COPY run.sh /tmp/run.sh
ENV AIRBYTE_ENTRYPOINT="/tmp/run.sh"
ENTRYPOINT /tmp/run.sh
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
FROM alpine:latest
COPY run.sh /tmp/run.sh
ENV AIRBYTE_ENTRYPOINT="/tmp/run.sh"
ENTRYPOINT /tmp/run.sh
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,7 @@ private void cleanupZombies(JobPersistence jobPersistence, JobNotifier jobNotifi

private static ProcessBuilderFactory getProcessBuilderFactory(Configs configs) {
if (configs.getWorkerEnvironment() == Configs.WorkerEnvironment.KUBERNETES) {
return new KubeProcessBuilderFactory(configs.getWorkspaceRoot());
return new KubeProcessBuilderFactory();
} else {
return new DockerProcessBuilderFactory(
configs.getWorkspaceRoot(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import com.fasterxml.jackson.dataformat.yaml.YAMLFactory;
import com.google.common.base.Joiner;
import com.google.common.collect.Lists;
import io.airbyte.commons.io.IOs;
import io.airbyte.commons.json.Jsons;
import io.airbyte.commons.resources.MoreResources;
import io.airbyte.workers.WorkerException;
Expand All @@ -54,41 +55,49 @@
import java.nio.file.Path;
import java.util.Arrays;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.TimeUnit;
import java.util.function.Predicate;
import java.util.stream.Collectors;

import org.apache.commons.lang.StringUtils;
import org.apache.commons.lang3.RandomStringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class KubeProcessBuilderFactory implements ProcessBuilderFactory {

private static final Logger LOGGER = LoggerFactory.getLogger(KubeProcessBuilderFactory.class);
private final String resourceName;

private static final Path WORKSPACE_MOUNT_DESTINATION = Path.of("/workspace");

private final Path workspaceRoot;
public KubeProcessBuilderFactory() {
this.resourceName = null; // todo: somehow make the different types of processes configurable
}

public KubeProcessBuilderFactory(Path workspaceRoot) {
this.workspaceRoot = workspaceRoot;
public KubeProcessBuilderFactory(String resourceName) {
this.resourceName = resourceName;
}

@Override
public ProcessBuilder create(String jobId, int attempt, final Path jobRoot, final String imageName, final String entrypoint, final String... args)
throws WorkerException {

try {
final String template = MoreResources.readResource("kube_runner_template.yaml");
final String template = MoreResources.readResource(resourceName);

// used to differentiate source and destination processes with the same id and attempt
final String suffix = RandomStringUtils.randomAlphabetic(5).toLowerCase();

ObjectMapper yamlMapper = new ObjectMapper(new YAMLFactory());

String command = getCommandFromImage(imageName);
LOGGER.info("Using entrypoint from image: " + command);

final String rendered = template.replaceAll("JOBID", jobId)
.replaceAll("ATTEMPTID", String.valueOf(attempt))
.replaceAll("IMAGE", imageName)
.replaceAll("SUFFIX", suffix)
.replaceAll("COMMAND", command)
.replaceAll("ARGS", Jsons.serialize(Arrays.asList(args)))
.replaceAll("WORKDIR", jobRoot.toString());

Expand All @@ -102,7 +111,7 @@ public ProcessBuilder create(String jobId, int attempt, final Path jobRoot, fina
"kubectl",
"run",
"--generator=run-pod/v1",
"--rm",
// "--rm", todo: add this back in
"-i",
"--pod-running-timeout=24h",
"--image=" + imageName,
Expand All @@ -118,43 +127,69 @@ public ProcessBuilder create(String jobId, int attempt, final Path jobRoot, fina
}
}

public static void main(String[] args) throws IOException, ApiException, InterruptedException {
var PORT = 9000;
String IP = null;
var destPodName = "destination-listen-and-echo";
KubernetesClient client = new DefaultKubernetesClient();

// Load spec and create the pod.
var stream = KubeProcessBuilderFactory.class.getClassLoader().getResourceAsStream("destination-listen-and-echo.yaml");
var destPodDef = client.pods().load(stream).get();
LOGGER.info("Loaded spec: {}", destPodDef);

var podSet = client.pods().inNamespace("default").list().getItems().stream()
.filter(pod -> pod.getMetadata().getName().equals(destPodName)).collect(Collectors.toSet());
if (podSet.size() == 0) {
LOGGER.info("Pod does not exist");
Pod destPod = client.pods().create(destPodDef); // watch command?
LOGGER.info("Created pod: {}, waiting for it to be ready", destPod);
client.resource(destPod).waitUntilReady(1, TimeUnit.MINUTES);
LOGGER.info("Dest Pod ready");
// todo: this should really be cached
private static String getCommandFromImage(String imageName) throws IOException {
final String suffix = RandomStringUtils.randomAlphabetic(5).toLowerCase();

final String podName = "airbyte-command-fetcher-" + suffix;

final List<String> cmd =
Lists.newArrayList(
"kubectl",
"run",
"--generator=run-pod/v1",
"--rm",
"-i",
"--pod-running-timeout=24h",
"--image=" + imageName,
"--command=true",
"--restart=Never",
podName,
"--",
"sh",
"-c",
"echo \"AIRBYTE_ENTRYPOINT=$AIRBYTE_ENTRYPOINT\"");

Process start = new ProcessBuilder(cmd).start();

try(BufferedReader reader = IOs.newBufferedReader(start.getInputStream())) {
String line;
while ((line = reader.readLine()) != null && !line.contains("AIRBYTE_ENTRYPOINT"));

if (line == null || !line.contains("AIRBYTE_ENTRYPOINT")) {
throw new RuntimeException("Unable to read AIRBYTE_ENTRYPOINT from the image. Make sure this environment variable is set in the Dockerfile!");
} else {
String[] splits = line.split("=", 2);
if(splits.length == 1) {
throw new RuntimeException("Unable to read AIRBYTE_ENTRYPOINT from the image. Make sure this environment variable is set in the Dockerfile!");
} else {
return splits[1];
}
}
}
}

Pod destPod = client.pods().inNamespace("default").withName(destPodName).get();
LOGGER.info("Found IP!");
LOGGER.info("Status: {}", destPod.getStatus());
LOGGER.info("IP: {}", destPod.getStatus().getPodIP());
IP = destPod.getStatus().getPodIP();

// Send something!
var clientSocket = new Socket(IP, PORT);
var out = new PrintWriter(clientSocket.getOutputStream(), true);
out.print("Hello!");
out.close();

// client.pods().delete(destPodDef);
// // TODO: Why does this wait not work?
// client.resource(destPodDef).waitUntilCondition(pod -> !pod.getStatus().getPhase().equals("Terminating"), 1, TimeUnit.MINUTES);
client.close();
public static void main(String[] args) {
try {
// todo: test this with args that are used by the process
Process process = new KubeProcessBuilderFactory("stdout_template.yaml")
.create(0L, 0, Path.of("/tmp"), "np_source:dev", null)
.start();

process.getOutputStream().write(100);
process.getInputStream().read();

// after running this main:
// kubectl port-forward airbyte-worker-0-0-fmave 9000:9000
// socat -d -d -d TCP-LISTEN:9000,bind=127.0.0.1 stdout

LOGGER.info("waiting...");
int code = process.waitFor();
LOGGER.info("code = " + code);
} catch (Exception e) {
LOGGER.error(e.getMessage());
e.printStackTrace();
}
}

}
Empty file.
30 changes: 0 additions & 30 deletions airbyte-workers/src/main/resources/kube_runner_template.yaml

This file was deleted.

43 changes: 43 additions & 0 deletions airbyte-workers/src/main/resources/stdin_stdout_template.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
apiVersion: v1
kind: Pod
metadata:
name: airbyte-worker-JOBID-ATTEMPTID-SUFFIX
spec:
restartPolicy: Never
initContainers:
- name: init
image: busybox:1.28
command: [ 'sh', '-c', "mkfifo /pipes/stdin && mkfifo /pipes/stdout" ]
volumeMounts:
- name: airbyte-pipes
mountPath: /pipes
containers:
- name: worker
image: IMAGE
workingDir: WORKDIR
command: [ 'sh', '-c', "cat /pipes/stdin | COMMAND > /pipes/stdout" ]
args: ARGS
volumeMounts:
# - name: airbyte-volume-workspace
# mountPath: /workspace
- name: airbyte-pipes
mountPath: /pipes
- name: socat
image: alpine/socat:1.7.4.1-r1
command: [ 'sh', '-c', "socat -d -d -d - TCP-L:9001 > /pipes/stdin" ]
env:
- name: POD_IP
valueFrom:
fieldRef:
fieldPath: status.podIP
ports:
- containerPort: 9000
volumeMounts:
- name: airbyte-pipes
mountPath: /pipes
volumes:
# - name: airbyte-volume-workspace
# persistentVolumeClaim:
# claimName: airbyte-volume-workspace
- name: airbyte-pipes
emptyDir: {}
43 changes: 43 additions & 0 deletions airbyte-workers/src/main/resources/stdout_template.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
apiVersion: v1
kind: Pod
metadata:
name: airbyte-worker-JOBID-ATTEMPTID-SUFFIX
spec:
restartPolicy: Never
initContainers:
- name: init
image: busybox:1.28
command: [ 'sh', '-c', "mkfifo /pipes/stdin && mkfifo /pipes/stdout" ]
volumeMounts:
- name: airbyte-pipes
mountPath: /pipes
containers:
- name: worker
image: IMAGE
workingDir: WORKDIR
command: [ 'sh', '-c', "COMMAND > /pipes/stdout" ]
args: ARGS
volumeMounts:
# - name: airbyte-volume-workspace
# mountPath: /workspace
- name: airbyte-pipes
mountPath: /pipes
- name: socat
image: alpine/socat:1.7.4.1-r1
command: [ 'sh', '-c', "cat /pipes/stdout | socat -d -d -d - TCP:host.docker.internal:9000" ] # todo: pass in the sync worker ip
env:
- name: POD_IP
valueFrom:
fieldRef:
fieldPath: status.podIP
ports:
- containerPort: 9000
volumeMounts:
- name: airbyte-pipes
mountPath: /pipes
volumes:
# - name: airbyte-volume-workspace
# persistentVolumeClaim:
# claimName: airbyte-volume-workspace
- name: airbyte-pipes
emptyDir: {}