-
Notifications
You must be signed in to change notification settings - Fork 4.4k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Productionise Kube #3623
Productionise Kube #3623
Conversation
@@ -1,4 +1,4 @@ | |||
FROM alpine:latest | |||
COPY run.sh /tmp/run.sh | |||
ENV AIRBYTE_ENTRYPOINT="/tmp/run.sh" | |||
ENTRYPOINT /tmp/run.sh | |||
ENV AIRBYTE_ENTRYPOINT=">&2 /tmp/run.sh" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is here to demonstrate the StdErr redirection works. Will remove before merging.
@@ -47,15 +41,16 @@ | |||
|
|||
public static void main(String[] args) throws InterruptedException, IOException { | |||
LOGGER.info("Launching source process..."); | |||
Process src = new KubePodProcess(KUBE_CLIENT, "src", "default", "np_source:dev", 9002, false); | |||
Process src = new KubePodProcess(KUBE_CLIENT, "src", "default", "np_source:dev", 9002, 9003, false); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Majority of the changes in this file are used to demonstrate that redirecting standard error works. They will be removed before merging.
|
||
LOGGER.info("Launching background thread to read destination lines..."); | ||
ExecutorService executor = Executors.newSingleThreadExecutor(); | ||
var listenTask = executor.submit(() -> { | ||
BufferedReader reader = new BufferedReader(new InputStreamReader(dest.getInputStream())); | ||
BufferedReader reader = new BufferedReader(new InputStreamReader(src.getErrorStream())); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In combination with the changes in the dockerfile (which routes standard out to standard err), I'm seeing
2021-05-26 14:36:17 INFO i.a.w.p.KubeProcessBuilderFactoryPOC(lambda$main$0):58 - {} - Destination sent: {"msg": "one"}
2021-05-26 14:36:17 INFO i.a.w.p.KubeProcessBuilderFactoryPOC(lambda$main$0):58 - {} - Destination sent: {"msg": "two"}
2021-05-26 14:36:18 INFO i.a.w.p.KubeProcessBuilderFactoryPOC(lambda$main$0):58 - {} - Destination sent: {"msg": "three"}
2021-05-26 14:36:19 INFO i.a.w.p.KubeProcessBuilderFactoryPOC(lambda$main$0):58 - {} - Destination sent: {"msg": "four"}
2021-05-26 14:36:20 INFO i.a.w.p.KubeProcessBuilderFactoryPOC(lambda$main$0):58 - {} - Destination sent: {"msg": "five"}
2021-05-26 14:36:21 INFO i.a.w.p.KubeProcessBuilderFactoryPOC(lambda$main$0):58 - {} - Destination sent: {"msg": "six"}
2021-05-26 14:36:22 INFO i.a.w.p.KubeProcessBuilderFactoryPOC(lambda$main$0):58 - {} - Destination sent: {"msg": "seven"}
2021-05-26 14:36:23 INFO i.a.w.p.KubeProcessBuilderFactoryPOC(lambda$main$0):58 - {} - Destination sent: {"msg": "eight"}
in logs, which confirms this is working.
@@ -3,7 +3,9 @@ plugins { | |||
} | |||
|
|||
dependencies { | |||
testImplementation 'org.apache.commons:commons-lang3:3.11' | |||
implementation 'org.apache.commons:commons-compress:1.20' | |||
implementation 'com.github.docker-java:docker-java:3.2.8' |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not super fan of bringing docker into every single project. Commons should remain lean
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
(we might need to create a docker-commons or smth like that
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I will deal with this before we merge this into master
@@ -35,4 +36,8 @@ public static String join(Iterable<?> iterable, CharSequence separator) { | |||
.collect(Collectors.joining(separator)); | |||
} | |||
|
|||
public static String addRandomSuffix(String base, int suffixLength) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
since it is a common, the separator should also be a parameter
* Use CDK to generate source that can be configured to emit a certain number of records and always works. * Checkpoint: socat works from inside the docker container. * Override the entry point. * Clean up and add ReadMe. * Clean up socat. * Checkpoint: connect to Kube cluster and list all the pods. * Checkpoint: Sync worker pod is able to send output to the destination pod. * Checkpoint: Sync worker creates Dest pod if none existed previously. It also waits for the pod to be ready before doing anything else. Sync worker will also remove the pod on termination. * update readme * Checkpoint: Dest pod does nott restart after finishing. Comment out delete command in Sync worker. * working towards named pipes * named pipes working * update readme * WIP named pipe / socat sidecar kube port forwarding (#3518) * nearly working sources * update * stdin example * move all kube testing yamls into the airbyte-workers directories. sort the airbyte-workers resource folder; place all the poc yamls together. * Format. * Put back the original KubeProcessBuilderFactory. * Fix slight errors. * Checkpoint: Worker pod knows its own IP. Successfully starts and writes to Dest pod after refactor. * remove unused file and update readme * Dest pod loops back into worker pod. However, the right messages do not seem to be passing in. * Switch back to worker ip. * SWEET VICTORY!. * wrap kube pod in process (#3540) also clean up kubernetes deploys. * More clean up. (#3586) The first 6 points of #3464. The only interesting thing about this PR is the kube pod shutdown. For whatever reason, the OkHttpPool isn't respecting the evictAll call and 1 idle thread remains. So instead of shutting down immediately, the worker pod shuts down after 5 mins when the idle thread id reaped. There isn't an easy way to modify the pool's idle reap configuration now. I do not think this issue is blocking since it's relatively benign, so I vote we create a ticket and come back to this once we do an e2e test. * Implements redirecting standard error as well. (#3623) * Clean up before next implementation. * kube process launching (#3790) * processes must handle file mounting * remove comment * default to base entrypoint * use process builder factory / select stdin / use a pool of ports * fix up * add super hacky copying example * Checkpoint: Works end to end! * Checkpoint: Use API to make sure init container is ready instead of blind sleep. Propagate exception in DefaultCheckConnectionWorker. * Refactor KubePodProcess. Checked to make sure everything still works. * Format. * Clean up code. Begin putting this into variables and breaking up long constructor function. * Add comments to explain what is happening. * fix normalization test * increase timeout for initcontainer Co-authored-by: Davin Chia <[email protected]> * facepalm moment * clean up kube poc pr (#3834) * clean up * remove source-always-works * create separate commons-docker * fix test * enable kube e2e tests (#3866) * enable kube e2e tests * use more generally accepted env definition * use new runners * use its own runner and install minikube differently * update name * use kubectl alias * use link instead of alias that doesn't propagate * start minikube * use driver=none * go back to using action * mess with versions * revert runner * install socat * print logs after run * also try re-runnining tasks * always wait for file transfer * use ports * increase wait timeout for kube * use different localhost ips and bump normalization to include an entrypoint * proposed fix * all working locally * revert temporary changes * revert normalization image change that's happening in a separate pr * readability * final comment * Working Kube Cancel. (#3983) * Port over the basic changes. * Add logic to return proper exit code in the event of termination. Add comments to explain why. * revert envs change and merge master to fix kube acceptance tests (#4012) * use older env format * fix build Co-authored-by: jrhizor <[email protected]> Co-authored-by: Jared Rhizor <[email protected]>
What
Implements redirecting standard error as well.
How
This uses the same approach we previously had with redirecting stdin and stdout.
The interesting part of this PR is some of the bits where I'm starting to automate the tests around the
KubePodProcess
.In particular, testing images are now automatically built from Dockerfiles in test/resource. I trying to test the stdout/stderr pipes and realising we currently assume the calling process is reachable from Kube namespace.
Pre-merge Checklist
Recommended reading order
KubePodProcess
KubePodProcessTest