Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

More clean up. #3586

Merged
merged 13 commits into from
May 26, 2021
3 changes: 0 additions & 3 deletions airbyte-workers/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,6 @@ FROM openjdk:14.0.2-slim

WORKDIR /airbyte

# Just so we can run kubectl for now.
COPY --from=lachlanevenson/k8s-kubectl:v1.10.3 /usr/local/bin/kubectl /usr/local/bin/kubectl

COPY build/distributions/airbyte-workers*.tar airbyte-workers.tar

RUN tar xf airbyte-workers.tar --strip-components=1
Expand Down
11 changes: 10 additions & 1 deletion airbyte-workers/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,15 @@ plugins {

application {
mainClass = 'io.airbyte.workers.process.KubeProcessBuilderFactoryPOC'
applicationDefaultJvmArgs = ['-Xmx500m',
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For remote debugging. Will remove before merging into branch. Leaving this here in case someone wants to take a look at what I saw.

port forwarding the port to localhost 6000 and attaching visual vm to the 6000 will confirm okhttp is the thread leak.

'-XX:NativeMemoryTracking=detail',
"-Djava.rmi.server.hostname=localhost",
'-Dcom.sun.management.jmxremote=true',
'-Dcom.sun.management.jmxremote.port=6000',
"-Dcom.sun.management.jmxremote.rmi.port=6000",
'-Dcom.sun.management.jmxremote.local.only=false',
'-Dcom.sun.management.jmxremote.authenticate=false',
'-Dcom.sun.management.jmxremote.ssl=false']
}

configurations {
Expand All @@ -30,7 +39,7 @@ dependencies {
implementation project(':airbyte-protocol:models')

testImplementation 'org.mockito:mockito-inline:2.13.0'
testImplementation 'org.testcontainers:testcontainers:1.14.3'
testImplementation 'org.testcontainers:testcontainers:1.15.3'
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was running into intermittent test errors without this.

testImplementation 'org.testcontainers:postgresql:1.15.1'
testImplementation 'org.postgresql:postgresql:42.2.18'
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import org.apache.commons.io.output.NullOutputStream;
import org.apache.commons.lang3.RandomStringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -63,7 +64,55 @@ public class KubePodProcess extends Process {
private final ServerSocket stdoutServerSocket;
private final ExecutorService executorService;

public KubePodProcess(KubernetesClient client, String podName, String image, int stdoutLocalPort, boolean usesStdin)
// TODO(Davin): Cache this result.
public static String getCommandFromImage(KubernetesClient client, String imageName, String namespace) throws IOException, InterruptedException {
final String suffix = RandomStringUtils.randomAlphabetic(5).toLowerCase();

final String podName = "airbyte-command-fetcher-" + suffix;

Container commandFetcher = new ContainerBuilder()
.withName("airbyte-command-fetcher")
.withImage(imageName)
.withCommand("sh", "-c", "echo \"AIRBYTE_ENTRYPOINT=$AIRBYTE_ENTRYPOINT\"")
.build();

Pod pod = new PodBuilder()
.withApiVersion("v1")
.withNewMetadata()
.withName(podName)
.endMetadata()
.withNewSpec()
.withRestartPolicy("Never")
.withContainers(commandFetcher)
.endSpec()
.build();
LOGGER.info("Creating pod...");
Pod podDefinition = client.pods().inNamespace(namespace).createOrReplace(pod);
LOGGER.info("Waiting until command fetcher pod completes...");
client.resource(podDefinition).waitUntilCondition(p -> p.getStatus().getPhase().equals("Succeeded"), 2, TimeUnit.MINUTES);

var logs = client.pods().inNamespace(namespace).withName(podName).getLog();
if (!logs.contains("AIRBYTE_ENTRYPOINT")) {
throw new RuntimeException("Missing AIRBYTE_ENTRYPOINT from command fetcher logs. This should not happen. Check the echo command has not been changed.");
}

var envVal = logs.split("=")[1].strip();
if (envVal.isEmpty()) {
throw new RuntimeException(
"Unable to read AIRBYTE_ENTRYPOINT from the image. Make sure this environment variable is set in the Dockerfile!");
}
return envVal;
}

public static String getPodIP(KubernetesClient client, String podName, String namespace) {
var pod = client.pods().inNamespace(namespace).withName(podName).get();
if (pod == null) {
throw new RuntimeException("Error: unable to find pod!");
}
return pod.getStatus().getPodIP();
}

public KubePodProcess(KubernetesClient client, String podName, String namespace, String image, int stdoutLocalPort, boolean usesStdin)
throws IOException, InterruptedException {
this.client = client;

Expand All @@ -84,7 +133,8 @@ public KubePodProcess(KubernetesClient client, String podName, String image, int
});

// create pod
String entrypoint = KubeProcessBuilderFactoryPOC.getCommandFromImage(image);
String entrypoint = getCommandFromImage(client, image, namespace);
LOGGER.info("Found entrypoint: {}", entrypoint);

Volume volume = new VolumeBuilder()
.withName("airbyte-pipes")
Expand Down Expand Up @@ -141,14 +191,14 @@ public KubePodProcess(KubernetesClient client, String podName, String image, int
.build();

LOGGER.info("Creating pod...");
this.podDefinition = client.pods().inNamespace("default").createOrReplace(pod);
this.podDefinition = client.pods().inNamespace(namespace).createOrReplace(pod);

LOGGER.info("Waiting until pod is ready...");
client.resource(podDefinition).waitUntilReady(5, TimeUnit.MINUTES);

// allow writing stdin to pod
LOGGER.info("Reading pod IP...");
var podIp = KubeProcessBuilderFactoryPOC.getPodIP(podName);
var podIp = getPodIP(client, podName, namespace);
LOGGER.info("Pod IP: {}", podIp);

if (usesStdin) {
Expand Down Expand Up @@ -179,7 +229,19 @@ public InputStream getErrorStream() {

@Override
public int waitFor() throws InterruptedException {
// These are closed in the opposite order in which they are created to prevent any resource
// conflicts.
client.resource(podDefinition).waitUntilCondition(this::isTerminal, 10, TimeUnit.DAYS);
try {
this.stdin.close();
this.stdoutServerSocket.close();
this.stdout.close();
} catch (IOException e) {
LOGGER.warn("Error while closing sockets and streams: ", e);
throw new InterruptedException();
}
this.executorService.shutdownNow();

return exitValue();
}

Expand All @@ -195,7 +257,7 @@ private boolean isTerminal(Pod pod) {
}

private int getReturnCode(Pod pod) {
Pod refreshedPod = client.pods().withName(pod.getMetadata().getName()).get(); // todo: use more robust version here
Pod refreshedPod = client.pods().inNamespace(pod.getMetadata().getNamespace()).withName(pod.getMetadata().getName()).get();
Preconditions.checkArgument(isTerminal(refreshedPod));

return refreshedPod.getStatus().getContainerStatuses()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,159 +24,80 @@

package io.airbyte.workers.process;

import com.google.common.collect.Lists;
import io.airbyte.commons.io.IOs;
import io.fabric8.kubernetes.api.model.Pod;
import io.fabric8.kubernetes.api.model.PodList;
import io.fabric8.kubernetes.client.Config;
import io.fabric8.kubernetes.client.ConfigBuilder;
import io.fabric8.kubernetes.client.DefaultKubernetesClient;
import io.fabric8.kubernetes.client.KubernetesClient;
import io.fabric8.kubernetes.client.utils.HttpClientUtils;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.PrintWriter;
import java.nio.file.Path;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import org.apache.commons.lang3.RandomStringUtils;
import okhttp3.OkHttpClient;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class KubeProcessBuilderFactoryPOC {

private static final Logger LOGGER = LoggerFactory.getLogger(KubeProcessBuilderFactoryPOC.class);

private static final KubernetesClient KUBE_CLIENT = new DefaultKubernetesClient();

// todo: this should really be cached
public static String getCommandFromImage(String imageName) throws IOException {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I moved all these functions to KubePodProcess as there was a hidden cyclic dependency. The process builder factory called the kube pod process class that called the factory back for these static functions when creating the process.

final String suffix = RandomStringUtils.randomAlphabetic(5).toLowerCase();

final String podName = "airbyte-command-fetcher-" + suffix;

final List<String> cmd =
Lists.newArrayList(
"kubectl",
"run",
"--generator=run-pod/v1",
// "--rm",
"-i",
"--pod-running-timeout=24h",
"--image=" + imageName,
"--command=true",
"--restart=Never",
podName,
"--",
"sh",
"-c",
"echo \"AIRBYTE_ENTRYPOINT=$AIRBYTE_ENTRYPOINT\"");

Process start = new ProcessBuilder(cmd).start();

try (BufferedReader reader = IOs.newBufferedReader(start.getInputStream())) {
String line;
while ((line = reader.readLine()) != null && !line.contains("AIRBYTE_ENTRYPOINT"));

if (line == null || !line.contains("AIRBYTE_ENTRYPOINT")) {
throw new RuntimeException("Unable to read AIRBYTE_ENTRYPOINT from the image. Make sure this environment variable is set in the Dockerfile!");
} else {
String[] splits = line.split("=", 2);
if (splits.length == 1) {
throw new RuntimeException(
"Unable to read AIRBYTE_ENTRYPOINT from the image. Make sure this environment variable is set in the Dockerfile!");
} else {
return splits[1];
}
}
}
}

private static void saveJaredWork() {
try {
// todo: test this with args that are used by the process
Process process = new KubeProcessBuilderFactory(Path.of("stdout_template.yaml"))
.create(0L, 0, Path.of("/tmp"), "np_source:dev", null)
.start();

process.getOutputStream().write(100);
process.getInputStream().read();

// after running this main:
// kubectl port-forward airbyte-worker-0-0-fmave 9000:9000
// socat -d -d -d TCP-LISTEN:9000,bind=127.0.0.1 stdout

LOGGER.info("waiting...");
int code = process.waitFor();
LOGGER.info("code = " + code);
} catch (Exception e) {
LOGGER.error(e.getMessage());
e.printStackTrace();
}
}

public static String getPodIP(String podName) {
// TODO: Why does directly searching for the pod not work?
// LOGGER.info(destPod.getStatus().getPodIP());
// destPod = client.resource(destPod).get();
// LOGGER.info("Status: {}", destPod.getStatus());
// LOGGER.info("IP: {}", destPod.getStatus().getPodIP());
// IP = destPod.getStatus().getPodIP();

// TODO: We could assign labels to pods to narrow the search.
PodList pods = KUBE_CLIENT.pods().inNamespace("default").list();
for (Pod p : pods.getItems()) {
// Filter by pod and retrieve IP.
if (p.getMetadata().getName().equals(podName)) {
LOGGER.info("Found IP!");
return p.getStatus().getPodIP();
}
}

return null;
}

public static void main(String[] args) throws InterruptedException, IOException {
LOGGER.info("Launching source process...");
Process src = new KubePodProcess(KUBE_CLIENT, "src", "np_source:dev", 9002, false);
Process src = new KubePodProcess(KUBE_CLIENT, "src", "default", "np_source:dev", 9002, false);

LOGGER.info("Launching destination process...");
Process dest = new KubePodProcess(KUBE_CLIENT, "dest", "np_dest:dev", 9003, true);
Process dest = new KubePodProcess(KUBE_CLIENT, "dest", "default", "np_dest:dev", 9003, true);

LOGGER.info("Launching background thread to read destination lines...");
ExecutorService executor = Executors.newSingleThreadExecutor();
executor.submit(() -> {
var listenTask = executor.submit(() -> {
BufferedReader reader = new BufferedReader(new InputStreamReader(dest.getInputStream()));

while (true) {
try {
String line;
if ((line = reader.readLine()) != null) {
LOGGER.info("Destination sent: {}", line);
}
} catch (IOException e) {
e.printStackTrace();
try {
String line;
while ((line = reader.readLine()) != null) {
LOGGER.info("Destination sent: {}", line);
}
} catch (IOException e) {
e.printStackTrace();
}
});

LOGGER.info("Copying source stdout to destination stdin...");

BufferedReader reader = IOs.newBufferedReader(src.getInputStream());
PrintWriter writer = new PrintWriter(dest.getOutputStream(), true);

String line;
while ((line = reader.readLine()) != null) {
writer.println(line);
try (BufferedReader reader = IOs.newBufferedReader(src.getInputStream())) {
try (PrintWriter writer = new PrintWriter(dest.getOutputStream(), true)) {
String line;
while ((line = reader.readLine()) != null) {
writer.println(line);
}
}
}
writer.close();

LOGGER.info("Waiting for source...");
LOGGER.info("Waiting for source process to terminate...");
src.waitFor();
LOGGER.info("Waiting for destination...");
LOGGER.info("Waiting for destination process to terminate...");
dest.waitFor();

LOGGER.info("Closing sync worker resources...");
listenTask.cancel(true);
executor.shutdownNow();
// TODO(Davin, issue-3611): Figure out why these commands are not effectively shutting down OkHTTP even though documentation suggests so. See
// https://square.github.io/okhttp/4.x/okhttp/okhttp3/-ok-http-client/#shutdown-isnt-necessary
// Instead, the pod shuts down after 5 minutes as the pool reaps the remaining idle connection after
// 5 minutes of inactivity, as per the default configuration.
// OK_HTTP_CLIENT.dispatcher().executorService().shutdownNow();
// OK_HTTP_CLIENT.connectionPool().evictAll();
// The Kube client has issues with closing the client. Since manually injecting the OkHttp client also doesn't work, it is not clear whether it's OkHTTP or the Fabric client at fault.
// See https://github.com/fabric8io/kubernetes-client/issues/2403.
KUBE_CLIENT.close();
LOGGER.info("Done!");
// Manually exit for the time being.
System.exit(0);

System.exit(0); // todo: handle executors so we don't need to kill the JVM
}

}
Loading