diff --git a/.github/workflows/docker.yml b/.github/workflows/docker.yml deleted file mode 100644 index 9773b997..00000000 --- a/.github/workflows/docker.yml +++ /dev/null @@ -1,15 +0,0 @@ -name: docker - -on: - push: - branches: master - pull_request: - branches: [ master ] - -jobs: - local_queue_example: - runs-on: ubuntu-latest - steps: - - uses: actions/checkout@v2 - - name: build local queue example - run: docker build -f examples/local_queue_example/Dockerfile . \ No newline at end of file diff --git a/.github/workflows/examples.yml b/.github/workflows/examples.yml new file mode 100644 index 00000000..0dffe94d --- /dev/null +++ b/.github/workflows/examples.yml @@ -0,0 +1,79 @@ +name: examples + +on: + push: + branches: master + pull_request: + branches: [ master ] + +jobs: + local_queue_example: + runs-on: ubuntu-latest + steps: + - uses: actions/checkout@v2 + - name: build local queue example + run: docker build -f examples/local_queue_example/Dockerfile . + + kubernetes_example_local_test: + runs-on: ubuntu-latest + steps: + - uses: actions/checkout@v2 + with: + path: './turbolift' + - name: install rustup and rust nightly + run: | + curl --proto '=https' --tlsv1.2 -sSf https://sh.rustup.rs | sh -s -- -y --default-toolchain nightly-2020-09-28 + - name: run tests + run: | + cd turbolift/examples/kubernetes_example + RUSTFLAGS='--cfg procmacro2_semver_exempt' cargo test -- --nocapture + + kubernetes_example_local_run: + runs-on: ubuntu-latest + steps: + - uses: actions/checkout@v2 + with: + path: './turbolift' + - name: install rustup and rust nightly + run: | + curl --proto '=https' --tlsv1.2 -sSf https://sh.rustup.rs | sh -s -- -y --default-toolchain nightly-2020-09-28 + - name: run tests + run: | + cd turbolift/examples/kubernetes_example + RUSTFLAGS='--cfg procmacro2_semver_exempt' cargo run + + kubernetes_example_distributed_test: + runs-on: ubuntu-latest + steps: + - uses: actions/checkout@v2 + with: + path: './turbolift' + - name: install rustup and rust nightly + run: | + curl --proto '=https' --tlsv1.2 -sSf https://sh.rustup.rs | sh -s -- -y --default-toolchain nightly-2020-09-28 + - uses: engineerd/setup-kind@v0.5.0 + with: + version: "v0.11.0" + - name: run tests + run: | + cd turbolift/examples/kubernetes_example + sh setup_cluster.sh + RUSTFLAGS='--cfg procmacro2_semver_exempt' cargo test --features distributed -- --nocapture + + kubernetes_example_distributed_run: + runs-on: ubuntu-latest + steps: + - uses: actions/checkout@v2 + with: + path: './turbolift' + - name: install rustup and rust nightly + run: | + curl --proto '=https' --tlsv1.2 -sSf https://sh.rustup.rs | sh -s -- -y --default-toolchain nightly-2020-09-28 + - uses: engineerd/setup-kind@v0.5.0 + with: + version: "v0.11.0" + - name: run tests + run: | + cd turbolift/examples/kubernetes_example + sh setup_cluster.sh + RUSTFLAGS='--cfg procmacro2_semver_exempt' cargo run --features distributed diff --git a/.github/workflows/rust.yml b/.github/workflows/linting.yml similarity index 64% rename from .github/workflows/rust.yml rename to .github/workflows/linting.yml index 4d9294c1..5f955bf8 100644 --- a/.github/workflows/rust.yml +++ b/.github/workflows/linting.yml @@ -1,8 +1,8 @@ -name: rust +name: linting on: push: - branches: [ master ] + branches: master pull_request: branches: [ master ] @@ -10,7 +10,7 @@ env: CARGO_TERM_COLOR: always jobs: - test: + lints: runs-on: ubuntu-latest steps: - uses: actions/checkout@v2 @@ -22,15 +22,15 @@ jobs: override: true - name: Rustup run: rustup update - - name: Format + - name: Format turbolift run: RUSTFLAGS='--cfg procmacro2_semver_exempt' cargo fmt + - name: Format turbolift_internals + run: cd turbolift_internals && RUSTFLAGS='--cfg procmacro2_semver_exempt' cargo fmt + - name: Format turbolift_macros + run: cd turbolift_macros && RUSTFLAGS='--cfg procmacro2_semver_exempt' cargo - name: Check without distributed feature run: RUSTFLAGS='--cfg procmacro2_semver_exempt' cargo check - name: Check with distributed feature - run: RUSTFLAGS='--cfg procmacro2_semver_exempt' cargo check --features "distributed" + run: RUSTFLAGS='--cfg procmacro2_semver_exempt' cargo check --features distributed - name: Clippy run: RUSTFLAGS='--cfg procmacro2_semver_exempt' cargo clippy -- -D warnings - - name: Local Queue without distributed feature - run: cd examples/local_queue_example && RUSTFLAGS='--cfg procmacro2_semver_exempt' cargo test - - name: Local Queue with distributed feature - run: cd examples/local_queue_example && RUSTFLAGS='--cfg procmacro2_semver_exempt' cargo test --features "distributed" diff --git a/.gitignore b/.gitignore index fa78f90a..1af1252e 100644 --- a/.gitignore +++ b/.gitignore @@ -1,4 +1,5 @@ /target target/ Cargo.lock -.turbolift/ \ No newline at end of file +.turbolift/ +.idea \ No newline at end of file diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml index c7d4ff99..9f41d618 100644 --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -1,3 +1,4 @@ +repos: - repo: local hooks: - id: format @@ -16,16 +17,31 @@ types: [rust] pass_filenames: false - id: check - name: Check + name: Cargo Check (local) description: Runs `cargo check` on the repository. entry: bash -c 'RUSTFLAGS='"'"'--cfg procmacro2_semver_exempt'"'"' cargo check "$@"' -- language: system types: [ rust ] pass_filenames: false - id: check - name: Check + name: Cargo Check (distributed) description: Runs `cargo check` on the repository with distributed flag - entry: bash -c 'RUSTFLAGS='"'"'--cfg procmacro2_semver_exempt'"'"' cargo check --features "distributed" "$@"' -- + entry: bash -c 'RUSTFLAGS='"'"'--cfg procmacro2_semver_exempt'"'"' cargo check --features distributed "$@"' -- language: system types: [ rust ] - pass_filenames: false \ No newline at end of file + pass_filenames: false +- repo: https://github.com/pre-commit/pre-commit-hooks + rev: v3.2.0 + hooks: + - id: trailing-whitespace + - id: mixed-line-ending + - id: end-of-file-fixer + - id: detect-private-key + - id: check-merge-conflict + - id: check-toml + - id: check-yaml +- repo: https://github.com/jumanjihouse/pre-commit-hooks + rev: 2.1.4 + hooks: + - id: markdownlint + - id: shellcheck diff --git a/Cargo.toml b/Cargo.toml index 85022eea..e215a250 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,22 +1,26 @@ [package] name = "turbolift" -version = "0.1.0" +version = "0.1.1" authors = ["Dominic Burkart <@DominicBurkart>"] edition = "2018" -description = "WIP" +description = "Easy distribution interface" +keywords = ["distribution", "distributed", "kubernetes", "K8s"] +categories = ["build-utils", "development-tools", "concurrency", "network-programming", "asynchronous"] readme = "README.md" +homepage = "https://dominic.computer/turbolift" license = "Hippocratic-2.1" +repository = "https://github.com/dominicburkart/turbolift/" [features] -distributed = ["chrono", "cached", "async-std"] -service = ["actix-web", "serde_json"] +distributed = ["chrono", "turbolift_macros/distributed"] # todo we can optimize reqs for children with this load [dependencies] turbolift_macros = { path = "./turbolift_macros" } turbolift_internals = { path = "./turbolift_internals" } -async-std = { version = "1.6", optional = true } chrono = { version = "0.4", optional = true } -cached = { version = "0.19", optional = true } -actix-web = { version = "3", optional = true } -serde_json = { version = "1", optional = true } \ No newline at end of file +actix-web = { version = "3" } +serde_json = { version = "1" } +tokio-compat-02 = { version = "0.1" } +tracing = {version="0.1", features=["attributes"]} +tracing-futures = "0.2.4" diff --git a/README.md b/README.md index b417e3c0..d3b0b69d 100644 --- a/README.md +++ b/README.md @@ -4,64 +4,89 @@ src="https://img.shields.io/crates/v/turbolift.svg" alt="turbolift’s current version badge" title="turbolift’s current version badge" /> -[![status](https://github.com/DominicBurkart/turbolift/workflows/rust/badge.svg)](https://github.com/DominicBurkart/turbolift/actions?query=is%3Acompleted+branch%3Amaster+workflow%3A"rust") -[![status](https://github.com/DominicBurkart/turbolift/workflows/docker/badge.svg)](https://github.com/DominicBurkart/turbolift/actions?query=is%3Acompleted+branch%3Amaster+workflow%3A"docker") +[![status](https://img.shields.io/github/checks-status/dominicburkart/turbolift/master)](https://github.com/DominicBurkart/turbolift/actions?query=branch%3Amaster) +[![website](https://img.shields.io/badge/-website-blue)](https://dominic.computer/turbolift) -Turbolift is a WIP distribution platform for rust. It's designed to make distribution an afterthought -by extracting and distributing specific functions and their dependencies from a larger rust application. -Turbolift then acts as the glue between these extracted mini-apps and the main application. +Turbolift is a WIP distribution platform for rust. It's designed to make +distribution an afterthought by extracting and distributing specific +functions and their dependencies from a larger rust application. +Turbolift then acts as the glue between these extracted mini-apps and +the main application. -Look in the [examples](https://github.com/DominicBurkart/turbolift/tree/master/examples) directory for -full projects with working syntax examples. +Look in the [examples](https://github.com/DominicBurkart/turbolift/tree/master/examples) +directory for full projects with working syntax examples. -## Distribution as an afterthought. -Turbolift allows developers to turn normal rust functions into distributed services - just by tagging them with a macro. This lets you develop in a monorepo environment, -but benefit from the scalability of microservice architectures. +## Distribution as an afterthought -## Orchestration with a feature flag. -For quicker development builds, `cargo build` doesn't build the distributed version of your code by default. -Distribution is feature-gated so that it's easy to turn on (for example, in production), or off (for example, -while developing locally). +Turbolift allows developers to turn normal rust functions into distributed services +just by tagging them with a macro. Right now, Turbolift only works with K8s, though +it's designed to be extended to other cluster management utilities. + +## Orchestration with a feature flag + +Distribution is feature-gated in Turbolift, so it's easy to activate distribution +for some builds and deactivate it for others (while developing locally, for +example). ## Important implementation notes -- implemented over http using `reqwest` and `actix-web` (no current plans to refactor to use a lower level network protocol). -- assumes a secure network– function parameters are sent in plaintext to the microservice. -- source vulnerability: when building, anything in the project directory or in local dependencies -declared in the project manifest could be bundled and sent over the network to workers. + +- implemented over http using `reqwest` and `actix-web` (no current plans to +refactor to use a lower level network protocol). +- assumes a secure network– function parameters are sent in plaintext to the +microservice. +- source vulnerability: when building, anything in the project directory or in +local dependencies declared in the project manifest could be bundled and sent +over the network to workers. + +More information is available on the [project homepage](https://dominic.computer/turbolift). ## Current Limitations -- *Because of reliance on unstable proc_macro::Span features, all programs using turbolift need to -be built with an unstable nightly compiler flag (e.g. `RUSTFLAGS='--cfg procmacro2_semver_exempt' cargo build`)* ([tracking issue](https://github.com/rust-lang/rust/issues/54725)). -- Functions are assumed to be pure (lacking side-effects such as -writing to the file system or mutation of a global variable). -*Today, this is not enforced by the code.* -- For a function to be distributed, its inputs and outputs have to be (de)serializable with [Serde](https://github.com/serde-rs/serde). + +- DO NOT RUN TURBOLIFT ON A PUBLIC-FACING CLUSTER. +- *Because of reliance on unstable proc_macro::Span features, all programs +using turbolift need to be built with an unstable nightly compiler flag (e.g. +`RUSTFLAGS='--cfg procmacro2_semver_exempt' cargo build`)* +([tracking issue](https://github.com/rust-lang/rust/issues/54725)). +- Functions are assumed to be pure (lacking side-effects such as +writing to the file system or mutation of a global variable). +*Today, this is not enforced by the code.* +- For a function to be distributed, its inputs and outputs have to be +(de)serializable with [Serde](https://github.com/serde-rs/serde). - Distributed functions cannot be nested in other functions. - Distributed functions cannot be methods. - Distributed functions cannot use other functions called `main`. -- Distributed functions not in `main.rs` cannot use functions declared +- Distributed functions not in `main.rs` cannot use functions declared in `main.rs`. - Distributed functions cannot have `-> impl Trait` types. -- Unused functions that have been marked with the `on` macro will still be -compiled for distribution, even if eventually the linker will then +- Unused functions that have been marked with the `on` macro will still be +compiled for distribution, even if eventually the linker will then remove the completed binary and distribution code. -- *Turbolift doesn't match the cargo compilation settings for microservices yet.* -- projects can have relative local dependencies listing in the cargo manifest, but those dependencies themselves -should not have relative local dependencies prone to breaking. -- if your program produces side effects when initialized, for example when -global constants are initialized, those side effects may be triggered +- projects can have relative local dependencies listing in the cargo +manifest, but those dependencies themselves should not have relative local +dependencies prone to breaking. +- if your program produces side effects when initialized, for example when +global constants are initialized, those side effects may be triggered for each function call. +- turbolift runs functions on an unreproducible linux build, it doesn't +e.g. pin the env or match the OS of the current environment. ## Current Project Goals -- [ ] support kubernetes ([pr](https://github.com/DominicBurkart/turbolift/pull/2)). + +- [X] support kubernetes ([pr](https://github.com/DominicBurkart/turbolift/pull/2)). +- [ ] implement liveliness and readiness checks for pods. +- [ ] while setting up a new service, wait for the pod to come alive via +readiness check instead of just sleeping ([code location](https://github.com/DominicBurkart/turbolift/blob/6a63d09afcd6e7234e62bcb797d31730cf49aacf/turbolift_internals/src/kubernetes.rs#L257)). - [ ] roadmap support for other targets. -- [X] only use distributed configuration when flagged (like in `cargo build --features "distributed"`). Otherwise, -just transform the tagged function into an async function (to provide an identical API), but don't -build any microservices or alter any code. -- [ ] build cross-architecture compilation tests into the CI (RN we only test via github actions read Docker, and a different custom Docker test workflow) +- [X] only use distributed configuration when flagged (like in +`cargo build --features "distributed"`). Otherwise, just transform the +tagged function into an async function (to provide an identical API), but +don't build any microservices or alter any code. +- [ ] build cross-architecture compilation tests into the CI. ## Current tech debt todo + - [ ] start reducing ginormous API, right now basically everything is public - [ ] refactor split between turbolift_internals and turbolift_macros - [ ] improve names +- [ ] send params in json as payload instead of directly in the url +- [ ] we need to do a better job of cleaning up docker images, locally and in the cluster. diff --git a/examples/README.md b/examples/README.md new file mode 100644 index 00000000..7240d414 --- /dev/null +++ b/examples/README.md @@ -0,0 +1,22 @@ +# Turbolift Examples + +## Kubernetes + +Builds a deployment, service, and ingress rule for the function to be +distributed. Cool features: distribution-as-a-feature, automatically deleting +the pods, deployments, services, and ingress rule when the main program +completes. This implementation is BYOC (Bring Your Own Container, you have to +pass a special function while instantiating the cluster interface that allows +makes the containers available in the cluster, perhaps via a private registry). + +## Local Queue + +The local queue example should never be used in a production application. It's +designed to test the core features of turbolift (automatically extracting +microservices from a rust codebase and running them on an http server), without +any of the platform-specific code for e.g. running on kubernetes. Check this +example out if you're interested in a bare-bones example turbolift project +without any platform-specific specialization. Note: if you're looking to run +code locally in turbolift instead of using a distribution platform, you should +deactivate the distributed turbolift feature in your project's `Cargo.toml`. +This will let your program run all services locally, e.g. while developing. diff --git a/examples/kubernetes_example/Cargo.toml b/examples/kubernetes_example/Cargo.toml new file mode 100644 index 00000000..3a52ea5e --- /dev/null +++ b/examples/kubernetes_example/Cargo.toml @@ -0,0 +1,22 @@ +[package] +name = "kubernetes_example" +version = "0.1.0" +authors = ["Dominic Burkart <1351120+DominicBurkart@users.noreply.github.com>"] +edition = "2018" + +[features] +"distributed" = ["turbolift/distributed"] + +[dependencies] +rand = "0.7" +tokio = {version="1", features=["full"]} +lazy_static = "1" +futures = "0.3" +cute = "0.3" +anyhow = "1.0.41" +turbolift = { path="../../" } + +# for printing out tracing +tracing = "0.1" +tracing-futures = "0.2" +tracing-subscriber = {version="0.2", features=["fmt"]} diff --git a/examples/kubernetes_example/run_tests.sh b/examples/kubernetes_example/run_tests.sh new file mode 100644 index 00000000..bdf1b56e --- /dev/null +++ b/examples/kubernetes_example/run_tests.sh @@ -0,0 +1,30 @@ +#!/usr/bin/env sh + +# assumes cargo, rust nightly, kind, and kubectl are installed. run from turbolift/examples/kubernetes_example + +# error if any command fails +set -e + +echo "🚡 running turbolift tests..." + +printf "\n😤 deleting current cluster if it exists\n" +kind delete cluster # make sure we don't need the cluster when running locally + +printf "\n📍 running non-distributed tests\n" +RUSTFLAGS='--cfg procmacro2_semver_exempt' cargo test -- --nocapture +RUSTFLAGS='--cfg procmacro2_semver_exempt' cargo run +echo "non-distributed tests completed." + +. setup_cluster.sh + +printf "\n🤸‍ run distributed tests\n" +RUSTFLAGS='--cfg procmacro2_semver_exempt' cargo test --features distributed -- --nocapture +RUSTFLAGS='--cfg procmacro2_semver_exempt' cargo run --features distributed +echo "🤸 distributed tests completed." + +printf "\n📍 re-run non-distributed tests\n" +RUSTFLAGS='--cfg procmacro2_semver_exempt' cargo test -- --nocapture +RUSTFLAGS='--cfg procmacro2_semver_exempt' cargo run +echo "📍 non-distributed tests completed." + +printf "\n🚡turbolift tests complete.\n" diff --git a/examples/kubernetes_example/setup_cluster.sh b/examples/kubernetes_example/setup_cluster.sh new file mode 100644 index 00000000..e9c2e7bb --- /dev/null +++ b/examples/kubernetes_example/setup_cluster.sh @@ -0,0 +1,40 @@ +#!/usr/bin/env sh + +# assumes that kind is already setup + +printf "\n😤 deleting current cluster if it exists\n" +kind delete cluster + +printf "\n👷 setting up cluster with custom ingress-compatible config\n" +cat < = Mutex::new(K8s::with_deploy_function_and_max_replicas( + Box::new(load_container_into_kind), + 2 + )); +} + +/// The application writer is responsible for placing +/// images where your cluster can access them. The +/// K8s constructor has a parameter which takes +/// a function that is called after the container is +/// built, so that the container may be added to a +/// specific registry or otherwise be made available. +fn load_container_into_kind(tag: &str) -> anyhow::Result<&str> { + std::process::Command::new("kind") + .args(format!("load docker-image {}", tag).as_str().split(' ')) + .status()?; + Ok(tag) +} + +/// This is the function we want to distribute! +#[on(K8S)] +fn square(u: u64) -> u64 { + u * u +} + +fn random_numbers() -> Vec { + let mut pseud = thread_rng(); + c![pseud.gen_range(0, 1000), for _i in 1..10] +} + +fn main() { + // use tracing.rs to print info about the program to stdout + tracing_subscriber::fmt() + .with_max_level(tracing::Level::TRACE) + .with_writer(std::io::stdout) + .init(); + + let input = random_numbers(); + let futures = c![square(*int), for int in &input]; + let mut rt = tokio::runtime::Runtime::new().unwrap(); + let output = rt.block_on(try_join_all(futures)).unwrap(); + println!( + "\n\ncomputation complete.\ninput: {:?}\noutput: {:?}", + input, output + ); + if output != c![x*x, for x in input] { + std::process::exit(1) + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn it_works() { + let input = random_numbers(); + let futures = c![square(*int), for int in &input]; + let mut rt = tokio::runtime::Runtime::new().unwrap(); + let output = rt.block_on(try_join_all(futures)).unwrap(); + assert_eq!( + output, + input.into_iter().map(|x| x * x).collect::>() + ); + } +} diff --git a/examples/local_queue_example/Cargo.toml b/examples/local_queue_example/Cargo.toml index 45530e03..07b739a6 100644 --- a/examples/local_queue_example/Cargo.toml +++ b/examples/local_queue_example/Cargo.toml @@ -12,8 +12,10 @@ edition = "2018" rand = "0.7" futures = "0.3" lazy_static = "1" -async-std = "1" +tokio = {version="1", features=["full"]} turbolift = { path="../../" } -# these are dependencies that need to be bundled into turbolift, but debugging how to reexport proc macros is hard. -cached = "0.19" \ No newline at end of file +# for printing out tracing +tracing = "0.1" +tracing-futures = "0.2" +tracing-subscriber = {version="0.2", features=["fmt"]} diff --git a/examples/local_queue_example/Dockerfile b/examples/local_queue_example/Dockerfile index 56d976cb..554a011c 100644 --- a/examples/local_queue_example/Dockerfile +++ b/examples/local_queue_example/Dockerfile @@ -1,6 +1,14 @@ # run me in turbolift root! E.G.: "docker build -f examples/local_queue_example/Dockerfile ." FROM rustlang/rust:nightly -COPY ./ ./ -WORKDIR examples/local_queue_example -RUN RUSTFLAGS='--cfg procmacro2_semver_exempt' cargo +nightly test -RUN RUSTFLAGS='--cfg procmacro2_semver_exempt' cargo +nightly test --features "distributed" \ No newline at end of file +RUN rustup default nightly-2020-09-28 +ENV RUSTFLAGS='--cfg procmacro2_semver_exempt' +COPY ./ turbolift +WORKDIR turbolift/examples/local_queue_example + +# test +RUN cargo +nightly test -- --nocapture +RUN cargo +nightly test --features distributed -- --nocapture + +# run +RUN cargo +nightly run +RUN cargo +nightly run --features distributed diff --git a/examples/local_queue_example/src/main.rs b/examples/local_queue_example/src/main.rs index 314c0418..6b09adc9 100644 --- a/examples/local_queue_example/src/main.rs +++ b/examples/local_queue_example/src/main.rs @@ -1,13 +1,14 @@ -use std::sync::Mutex; - extern crate proc_macro; -use async_std::task; use futures::future::try_join_all; use rand; use turbolift::local_queue::LocalQueue; use turbolift::on; #[macro_use] extern crate lazy_static; +use tokio::sync::Mutex; + +use tracing::{self, info}; +use tracing_subscriber; lazy_static! { static ref LOCAL: Mutex = Mutex::new(LocalQueue::new()); @@ -19,6 +20,12 @@ fn identity(b: bool) -> bool { } fn main() { + // use tracing.rs to print info about the program to stdout + tracing_subscriber::fmt() + .with_max_level(tracing::Level::TRACE) + .with_writer(std::io::stdout) + .init(); + let input = vec![rand::random(), rand::random(), rand::random()]; let futures = { let mut v = Vec::new(); @@ -27,8 +34,15 @@ fn main() { } v }; - let output = task::block_on(try_join_all(futures)).unwrap(); - println!("input: {:?}\noutput: {:?}", input, output); + let mut rt = tokio::runtime::Runtime::new().unwrap(); + let output = rt.block_on(try_join_all(futures)).unwrap(); + println!( + "\n\nAll responses received.\ninput: {:?}\noutput: {:?}", + input, output + ); + if output != input { + std::process::exit(1) + } } #[cfg(test)] @@ -37,6 +51,7 @@ mod tests { #[test] fn it_works() { + println!("test started"); let input = vec![rand::random(), rand::random(), rand::random()]; let futures = { let mut v = Vec::new(); @@ -45,7 +60,8 @@ mod tests { } v }; - let output = task::block_on(try_join_all(futures)).unwrap(); + let mut rt = tokio::runtime::Runtime::new().unwrap(); + let output = rt.block_on(try_join_all(futures)).unwrap(); assert_eq!(input, output); } } diff --git a/src/lib.rs b/src/lib.rs index 3a0bd11f..02f4b3cb 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1,14 +1,10 @@ #[cfg(feature = "distributed")] -pub use async_std; -#[cfg(feature = "distributed")] -pub use cached; -#[cfg(feature = "distributed")] pub use chrono; -#[cfg(feature = "service")] pub use actix_web; -#[cfg(feature = "service")] pub use serde_json; +pub use tokio_compat_02; +pub use tracing; pub use distributed_platform::{DistributionPlatform, DistributionResult}; pub use turbolift_internals::*; diff --git a/turbolift_internals/Cargo.toml b/turbolift_internals/Cargo.toml index 974ef845..f597bfd6 100644 --- a/turbolift_internals/Cargo.toml +++ b/turbolift_internals/Cargo.toml @@ -7,25 +7,36 @@ edition = "2018" # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html [dependencies] -syn = {version = "1", features=["full"] } +syn = { version = "1", features=["full"] } quote = "1" serde = "1" serde_json = "1" brotli2 = "0.3.2" data-encoding = "2" futures = "0.3" -proc-macro2 = { version = "1.0", features = ["span-locations"]} +proc-macro2 = { version = "1", features = ["span-locations"]} tar = "0.4" toml = "0.5" cargo-toml2 = "1" tempfile = "3.1" -tokio = { version = "0.2", features = ["full"] } -surf = "1.0.3" +reqwest = "0.11" +tokio = { version = "1", features = ["full"] } +tokio-compat-02 = "0.1" cute = "0.3" rand = "0.7" url = "2" lazy_static = "1" anyhow = "1" cached = "0.19" -async-std = "1.6" -async-trait = "0.1" \ No newline at end of file +async-trait = "0.1" +get_if_addrs = "0.5.3" +regex = "1" +tracing = {version="0.1", features=["attributes"]} +tracing-futures = "0.2.4" +uuid = { version="0.8", features=["v4"] } +derivative = "2.2.0" + +# kubernetes-specific requirements +kube = "0.51.0" +kube-runtime = "0.51.0" +k8s-openapi = { version = "0.11.0", default-features = false, features = ["v1_20"] } diff --git a/turbolift_internals/src/build_project.rs b/turbolift_internals/src/build_project.rs index 2293f336..bafad3b1 100644 --- a/turbolift_internals/src/build_project.rs +++ b/turbolift_internals/src/build_project.rs @@ -1,14 +1,24 @@ -use std::collections::HashSet; +use std::collections::{HashSet, VecDeque}; +use std::ffi::OsStr; use std::fs; -use std::path::Path; +use std::path::{Path, PathBuf}; use std::process::Command; +use std::str::FromStr; -use crate::utils::symlink_dir; +use crate::utils::{symlink_dir, IS_RELEASE, RELEASE_FLAG}; -pub fn edit_cargo_file(cargo_path: &Path, function_name: &str) -> anyhow::Result<()> { +#[tracing::instrument] +pub fn edit_cargo_file( + original_project_source_dir: &Path, + cargo_path: &Path, + function_name: &str, +) -> anyhow::Result<()> { + let project_canonical = original_project_source_dir.canonicalize()?; + + let local_deps_dir_name = ".local_deps"; let mut parsed_toml: cargo_toml2::CargoToml = cargo_toml2::from_path(cargo_path) .unwrap_or_else(|_| panic!("toml at {:?} could not be read", cargo_path)); - let relative_local_deps_cache = cargo_path.parent().unwrap().join(".local_deps"); + let relative_local_deps_cache = cargo_path.parent().unwrap().join(local_deps_dir_name); fs::create_dir_all(&relative_local_deps_cache)?; let local_deps_cache = relative_local_deps_cache.canonicalize()?; @@ -23,21 +33,23 @@ pub fn edit_cargo_file(cargo_path: &Path, function_name: &str) -> anyhow::Result let details = deps .iter_mut() // only full dependency descriptions (not simple version number) - .filter_map(|(_name, dep)| match dep { + .filter_map(|(name, dep)| match dep { cargo_toml2::Dependency::Simple(_) => None, - cargo_toml2::Dependency::Full(detail) => Some(detail), + cargo_toml2::Dependency::Full(detail) => Some((name, detail)), }); + let mut completed_locations = HashSet::new(); - for detail in details { + for (name, detail) in details { // only descriptions with a path if let Some(ref mut buf) = detail.path { // determine what the symlink for this dependency should be - let canonical = buf.canonicalize()?; - let dep_location = local_deps_cache.join( - &canonical - .file_name() - .unwrap_or_else(|| canonical.as_os_str()), - ); + let dep_canonical = original_project_source_dir.join(&buf).canonicalize()?; + let dep_location = local_deps_cache.join(name); + + // check if the dependency is an ancestor of the project + let is_ancestor = project_canonical + .ancestors() + .any(|p| p == dep_canonical.as_path()); // check that we don't have a naming error // todo: automatically handle naming conflicts by mangling the dep for one @@ -48,18 +60,34 @@ pub fn edit_cargo_file(cargo_path: &Path, function_name: &str) -> anyhow::Result } if dep_location.exists() { - // dependency already exists, does it point to the correct place? - if canonical == dep_location.canonicalize()? { - // output already points to the right place, do nothing - } else { - // output points somewhere else; delete it; if it's non-empty, error - fs::remove_dir(&dep_location).unwrap(); - symlink_dir(&canonical, &dep_location)?; + if dep_canonical == dep_location.canonicalize()? { + // output already points to the right place, presumably because a previous + // turbolift build already created a symlink. No need to alter the + // dependency cache, just point to the cache in the manifest and move on. + *buf = PathBuf::from_str(".")?.join(local_deps_dir_name).join(name); + continue; } + + // the dependency cache is not correct. We should delete what's currently there + // (note: symlinks will be removed, but the original files they link to will + // not be altered). + fs::remove_dir_all(&dep_location)?; + } + + if !is_ancestor { + symlink_dir(&dep_canonical, &dep_location)?; } else { - symlink_dir(&canonical, &dep_location)?; + // copy instead of symlinking here to avoid a symlink loop that will confuse and + // break the tar packer / unpacker. + exclusive_recursive_copy( + dep_canonical.as_path(), + dep_location.as_path(), + vec![project_canonical.clone()].into_iter().collect(), + vec![OsStr::new("target")].into_iter().collect(), + )?; } - *buf = dep_location.canonicalize().unwrap(); + + *buf = PathBuf::from_str(".")?.join(local_deps_dir_name).join(name); } } @@ -79,6 +107,42 @@ pub fn edit_cargo_file(cargo_path: &Path, function_name: &str) -> anyhow::Result Ok(()) } +/// recursively copy a directory to a target, excluding a path and its +/// children if it exists as a descendant of the source_dir. +fn exclusive_recursive_copy( + source_dir: &Path, + target_dir: &Path, + exclude_paths: HashSet, + exclude_file_names: HashSet<&OsStr>, +) -> anyhow::Result<()> { + fs::create_dir_all(target_dir)?; + let source_dir_canonical = source_dir.to_path_buf().canonicalize()?; + let mut to_check = fs::read_dir(source_dir_canonical.as_path())?.collect::>(); + while !to_check.is_empty() { + let entry = to_check.pop_front().unwrap()?; + let entry_path = entry.path(); + if exclude_paths.contains(&entry_path) + || entry_path + .file_name() + .map_or(false, |f| exclude_file_names.contains(f)) + { + // skip the excluded path (and, if it has any, all of its children) + } else { + let relative_entry_path = entry_path.strip_prefix(source_dir_canonical.as_path())?; + let output = target_dir.join(relative_entry_path); + + if entry.metadata()?.is_dir() { + to_check.append(&mut fs::read_dir(entry.path())?.collect::>()); + fs::create_dir_all(output)?; + } else { + fs::copy(entry_path, output)?; + } + } + } + Ok(()) +} + +#[tracing::instrument] pub fn lint(proj_path: &Path) -> anyhow::Result<()> { let install_status = Command::new("rustup") .args("component add rustfmt".split(' ')) @@ -99,10 +163,11 @@ pub fn lint(proj_path: &Path) -> anyhow::Result<()> { Ok(()) } +#[tracing::instrument] pub fn make_executable(proj_path: &Path, dest: Option<&Path>) -> anyhow::Result<()> { let status = Command::new("cargo") .current_dir(proj_path) - .args("build --release --features \"distributed,service\"".split(' ')) + .args(format!("build{}", RELEASE_FLAG).as_str().trim().split(' ')) .status()?; if !status.success() { @@ -116,7 +181,11 @@ pub fn make_executable(proj_path: &Path, dest: Option<&Path>) -> anyhow::Result< let cargo_path = proj_path.join("Cargo.toml"); let parsed_toml: cargo_toml2::CargoToml = cargo_toml2::from_path(cargo_path)?; let project_name = parsed_toml.package.name; - let local_path = "target/release/".to_string() + &project_name; + let local_path = if IS_RELEASE { + "target/release/".to_string() + &project_name + } else { + "target/debug/".to_string() + &project_name + }; proj_path.canonicalize().unwrap().join(&local_path) }; fs::rename(&executable_path, destination)?; @@ -124,6 +193,7 @@ pub fn make_executable(proj_path: &Path, dest: Option<&Path>) -> anyhow::Result< Ok(()) } +#[tracing::instrument] pub fn check(proj_path: &Path) -> anyhow::Result<()> { let status = Command::new("cargo") .current_dir(proj_path) diff --git a/turbolift_internals/src/distributed_platform.rs b/turbolift_internals/src/distributed_platform.rs index 22aab0ba..ba0bfce7 100644 --- a/turbolift_internals/src/distributed_platform.rs +++ b/turbolift_internals/src/distributed_platform.rs @@ -11,7 +11,7 @@ pub type JsonResponse = String; #[async_trait] pub trait DistributionPlatform { /// declare a function - fn declare(&mut self, function_name: &str, project_tar: &[u8]); + async fn declare(&mut self, function_name: &str, project_tar: &[u8]) -> DistributionResult<()>; // dispatch params to a function async fn dispatch( @@ -19,4 +19,6 @@ pub trait DistributionPlatform { function_name: &str, params: ArgsString, ) -> DistributionResult; + + fn has_declared(&self, fn_name: &str) -> bool; } diff --git a/turbolift_internals/src/extract_function.rs b/turbolift_internals/src/extract_function.rs index 583bb8cc..0808dcf1 100644 --- a/turbolift_internals/src/extract_function.rs +++ b/turbolift_internals/src/extract_function.rs @@ -1,24 +1,24 @@ -use proc_macro2::TokenStream; +use std::collections::VecDeque; use std::fs; use std::io::Cursor; use std::path::{Path, PathBuf}; use std::str::FromStr; +use proc_macro2::TokenStream as TokenStream2; use quote::ToTokens; -use tar::{Archive, Builder}; - -use syn::export::TokenStream2; use syn::spanned::Spanned; +use tar::{Archive, Builder}; use crate::distributed_platform::DistributionResult; -use crate::CACHE_PATH; -use std::collections::VecDeque; type TypedParams = syn::punctuated::Punctuated; type UntypedParams = syn::punctuated::Punctuated, syn::Token![,]>; type ParamTypes = syn::punctuated::Punctuated, syn::Token![,]>; -pub fn get_fn_item(function: TokenStream) -> syn::ItemFn { +const IGNORED_DIRECTORIES: [&str; 3] = ["target", ".git", ".turbolift"]; + +#[tracing::instrument] +pub fn get_fn_item(function: TokenStream2) -> syn::ItemFn { match syn::parse2(function).unwrap() { syn::Item::Fn(fn_item) => fn_item, _ => panic!("token stream does not represent function."), @@ -28,6 +28,7 @@ pub fn get_fn_item(function: TokenStream) -> syn::ItemFn { /// wraps any calls to the target function from within its own service with the return type as /// if the call was made from outside the service. This is one way to allow compilation while /// references to the target function are in the service codebase. +#[tracing::instrument] pub fn make_dummy_function( function: syn::ItemFn, redirect_fn_name: &str, @@ -71,6 +72,7 @@ pub fn make_dummy_function( } } +#[tracing::instrument] pub fn to_untyped_params(typed_params: TypedParams) -> UntypedParams { typed_params .into_iter() @@ -80,7 +82,9 @@ pub fn to_untyped_params(typed_params: TypedParams) -> UntypedParams { }) .collect() } + /// params -> {param1}/{param2}/{param3}[...] +#[tracing::instrument] pub fn to_path_params(untyped_params: UntypedParams) -> String { let open_bracket = "{"; let close_bracket = "}".to_string(); @@ -92,6 +96,7 @@ pub fn to_path_params(untyped_params: UntypedParams) -> String { path_format.join("/") } +#[tracing::instrument] pub fn to_param_types(typed_params: TypedParams) -> ParamTypes { typed_params .into_iter() @@ -102,7 +107,8 @@ pub fn to_param_types(typed_params: TypedParams) -> ParamTypes { .collect() } -pub fn params_json_vec(untyped_params: UntypedParams) -> TokenStream { +#[tracing::instrument] +pub fn params_json_vec(untyped_params: UntypedParams) -> TokenStream2 { let punc: Vec = untyped_params .into_iter() .map(|pat| { @@ -113,10 +119,11 @@ pub fn params_json_vec(untyped_params: UntypedParams) -> TokenStream { .collect(); let vec_string = format!("vec![{}]", punc.join(", ")); - TokenStream::from_str(&vec_string).unwrap() + TokenStream2::from_str(&vec_string).unwrap() } -pub fn get_sanitized_file(function: &TokenStream) -> TokenStream { +#[tracing::instrument] +pub fn get_sanitized_file(function: &TokenStream2) -> TokenStream2 { let span = function.span(); let path = span.source_file().path(); let start_line = match span.start().line { @@ -150,12 +157,14 @@ pub fn get_sanitized_file(function: &TokenStream) -> TokenStream { TokenStream2::from_str(&sanitized_string).unwrap() } -pub fn unpack_path_params(untyped_params: &UntypedParams) -> TokenStream { +#[tracing::instrument] +pub fn unpack_path_params(untyped_params: &UntypedParams) -> TokenStream2 { let n_params = untyped_params.len(); let params: Vec = (0..n_params).map(|i| format!("path.{}", i)).collect(); - TokenStream::from_str(¶ms.join(", ")).unwrap() + TokenStream2::from_str(¶ms.join(", ")).unwrap() } +#[tracing::instrument] pub fn make_compressed_proj_src(dir: &Path) -> Vec { let cursor = Cursor::new(Vec::new()); let mut archive = Builder::new(cursor); @@ -166,28 +175,24 @@ pub fn make_compressed_proj_src(dir: &Path) -> Vec { .map(|entry| (dir.file_name().unwrap().into(), entry)) .collect(); // ignore read errors - archive.append_dir(dir.file_name().unwrap(), dir).unwrap(); + let tar_project_base_dir = dir.file_name().unwrap(); + + archive.append_dir(tar_project_base_dir, dir).unwrap(); while !entries.is_empty() { let (entry_parent, entry) = entries.pop_front().unwrap(); - if entry.file_name().to_str() == Some("target") && entry.metadata().unwrap().is_dir() { - // in target directories, only pass release (if it exists) - let release_deps = entry.path().join("release/deps"); - if release_deps.exists() { - let path = { - if entry_parent == dir { - PathBuf::from_str("target/release").unwrap() - } else { - entry_parent.join("target").join("release") - } - }; - archive.append_dir_all(path, release_deps).unwrap(); - } + if entry.metadata().unwrap().is_dir() + && (IGNORED_DIRECTORIES // todo could there be cases where removing .git messes up a dependency? + .contains(&entry.file_name().to_str().unwrap_or(""))) + { + // ignore target and .git repository } else { let entry_path_with_parent = entry_parent.join(entry.file_name()); if entry.path().is_dir() { // ^ bug: the metadata on symlinks sometimes say that they are not directories, // so we have to metadata.is_dir() || (metadata.file_type().is_symlink() && entry.path().is_dir() ) - if CACHE_PATH.file_name().unwrap() != entry.file_name() { + if IGNORED_DIRECTORIES.contains(&entry.file_name().to_str().unwrap_or("")) { + // don't include any target or .turbolift directories + } else { archive .append_dir(&entry_path_with_parent, entry.path()) .unwrap(); @@ -195,10 +200,8 @@ pub fn make_compressed_proj_src(dir: &Path) -> Vec { fs::read_dir(entry.path()) .unwrap() .filter_map(Result::ok) - .map(|child| (entry_path_with_parent.clone(), child)), + .map(|child| (entry_parent.join(entry.file_name()), child)), ) - } else { - // don't include the cache } } else { let mut f = fs::File::open(entry.path()).unwrap(); @@ -210,6 +213,7 @@ pub fn make_compressed_proj_src(dir: &Path) -> Vec { archive.into_inner().unwrap().into_inner() } +#[tracing::instrument(skip(src))] pub fn decompress_proj_src(src: &[u8], dest: &Path) -> DistributionResult<()> { let cursor = Cursor::new(src.to_owned()); let mut archive = Archive::new(cursor); @@ -217,6 +221,7 @@ pub fn decompress_proj_src(src: &[u8], dest: &Path) -> DistributionResult<()> { } /// assumes input is a function, not a closure. +#[tracing::instrument] pub fn get_result_type(output: &syn::ReturnType) -> TokenStream2 { match output { syn::ReturnType::Default => TokenStream2::from_str("()").unwrap(), diff --git a/turbolift_internals/src/kubernetes.rs b/turbolift_internals/src/kubernetes.rs index 00bca357..5f288a9e 100644 --- a/turbolift_internals/src/kubernetes.rs +++ b/turbolift_internals/src/kubernetes.rs @@ -1 +1,413 @@ -pub struct K8s {} +use std::collections::HashMap; +use std::io::Write; +use std::process::{Command, Stdio}; +use std::str::FromStr; + +use async_trait::async_trait; +use derivative::Derivative; +use k8s_openapi::api::apps::v1::Deployment; +use k8s_openapi::api::core::v1::Service; +use kube::api::{Api, PostParams}; +use kube::Client; +use regex::Regex; +use tokio::time::{sleep, Duration}; +use tokio_compat_02::FutureExt; +use url::Url; +use uuid::Uuid; + +use crate::distributed_platform::{ + ArgsString, DistributionPlatform, DistributionResult, JsonResponse, +}; +use crate::utils::{DEBUG_FLAG, RELEASE_FLAG}; +use crate::CACHE_PATH; + +const TURBOLIFT_K8S_NAMESPACE: &str = "default"; +type ImageTag = String; +type DeployContainerFunction = Box anyhow::Result<&str> + Send + 'static>; + +pub const CONTAINER_PORT: i32 = 5678; +pub const SERVICE_PORT: i32 = 5678; +pub const EXTERNAL_PORT: i32 = 80; +pub const TARGET_ARCHITECTURE: Option<&str> = None; +// ^ todo: we want the user to be able to specify something like `Some("x86_64-unknown-linux-musl")` +// during config, but right now that doesn't work because we are relying on super unstable +// span features to extract functions into services. When we can enable statically linked +// targets, we can use the multi-stage build path and significantly reduce the size. + +#[derive(Derivative)] +#[derivative(Debug)] +/// `K8s` is the interface for turning rust functions into autoscaling microservices +/// using turbolift. It requires docker and kubernetes / kubectl to already be setup on the +/// device at runtime. +/// +/// Access to the kubernetes cluster must be inferrable from the env variables at runtime +/// per kube-rs's +/// [try_default()](https://docs.rs/kube/0.56.0/kube/client/struct.Client.html#method.try_default). +pub struct K8s { + max_scale_n: u32, + fn_names_to_ips: HashMap, + request_client: reqwest::Client, + run_id: Uuid, + + #[derivative(Debug = "ignore")] + /// A function called after the image is built locally via docker. deploy_container + /// receives the tag for the local image (accessible in docker) and is responsible + /// for making said image accessible to the target cluster. The output of + /// deploy_container is the tag that Kubernetes can use to refer to and access the + /// image throughout the cluster. + /// + /// Some examples of how this function can be implemented: uploading the image to + /// the cluster's private registry, uploading the image publicly to docker hub + /// (if the image is not sensitive), loading the image into KinD in tests. + deploy_container: DeployContainerFunction, +} + +impl K8s { + /// returns a K8s object. If max is equal to 1, then autoscaling + /// is not enabled. Otherwise, autoscale is automatically activated + /// with cluster defaults and a max number of replicas *per distributed + /// function* of `max`. Panics if `max` < 1. + /// + /// The deploy container function is used for making containers accessible + /// to the cluster. See [`K8s::deploy_container`]. + #[tracing::instrument(skip(deploy_container))] + pub fn with_deploy_function_and_max_replicas( + deploy_container: DeployContainerFunction, + max: u32, + ) -> K8s { + if max < 1 { + panic!("max < 1 while instantiating k8s (value: {})", max) + } + K8s { + deploy_container, + max_scale_n: max, + fn_names_to_ips: HashMap::new(), + request_client: reqwest::Client::new(), + run_id: Uuid::new_v4(), + } + } +} + +fn sanitize_function_name(function_name: &str) -> String { + function_name.to_string().replace("_", "-") +} + +#[async_trait] +impl DistributionPlatform for K8s { + #[tracing::instrument(skip(project_tar))] + async fn declare(&mut self, function_name: &str, project_tar: &[u8]) -> DistributionResult<()> { + // connect to cluster. tries in-cluster configuration first, then falls back to kubeconfig file. + let deployment_client = Client::try_default().compat().await?; + let deployments: Api = + Api::namespaced(deployment_client, TURBOLIFT_K8S_NAMESPACE); + let service_client = Client::try_default().compat().await?; + let services: Api = Api::namespaced(service_client, TURBOLIFT_K8S_NAMESPACE); + + // generate image & push + let app_name = format!("{}-{}", sanitize_function_name(function_name), self.run_id); + let container_name = format!("{}-app", app_name); + let deployment_name = format!("{}-deployment", app_name); + let service_name = format!("{}-service", app_name); + let ingress_name = format!("{}-ingress", app_name); + let tag_in_reg = make_image(self, &app_name, function_name, project_tar)?; + + // make deployment + let deployment_json = serde_json::json!({ + "apiVersion": "apps/v1", + "kind": "Deployment", + "metadata": { + "name": deployment_name, + "labels": { + "turbolift_run_id": self.run_id.to_string() + } + }, + "spec": { + "selector": { + "matchLabels": { + "app": app_name + } + }, + "replicas": 1, + "template": { + "metadata": { + "name": format!("{}-app", app_name), + "labels": { + "app": app_name, + "turbolift_run_id": self.run_id.to_string(), + } + }, + "spec": { + "containers": [ + { + "name": container_name, + "image": tag_in_reg + } + ] + } + } + } + }); + let deployment = serde_json::from_value(deployment_json)?; + deployments + .create(&PostParams::default(), &deployment) + .compat() + .await?; + + // make service pointing to deployment + let service_json = serde_json::json!({ + "apiVersion": "v1", + "kind": "Service", + "metadata": { + "name": service_name, + "labels": { + "turbolift_run_id": self.run_id.to_string(), + } + }, + "spec": { + "selector": { + "app": app_name + }, + "ports": [{ + "port": SERVICE_PORT, + "targetPort": CONTAINER_PORT, + }] + } + }); + let service = serde_json::from_value(service_json)?; + services + .create(&PostParams::default(), &service) + .compat() + .await?; + + // make ingress pointing to service + let ingress = serde_json::json!({ + "apiVersion": "networking.k8s.io/v1", + "kind": "Ingress", + "metadata": { + "name": ingress_name, + "labels": { + "turbolift_run_id": self.run_id.to_string(), + } + }, + "spec": { + "rules": [ + { + "http": { + "paths": [ + { + "path": format!("/{}/{}", function_name, self.run_id), + "pathType": "Prefix", + "backend": { + "service" : { + "name": service_name, + "port": { + "number": SERVICE_PORT + } + } + } + } + ] + } + } + ] + } + }); + + let mut apply_ingress_child = Command::new("kubectl") + .args("apply -f -".split(' ')) + .stdin(Stdio::piped()) + .spawn()?; + apply_ingress_child + .stdin + .as_mut() + .expect("not able to write to ingress apply stdin") + .write_all(ingress.to_string().as_bytes())?; + if !apply_ingress_child.wait()?.success() { + panic!( + "failed to apply ingress: {}\nis ingress enabled on this cluster?", + ingress.to_string() + ) + } + + let ingress_ip = format!( + "http://localhost:{}/{}/{}/", + EXTERNAL_PORT, function_name, self.run_id + ); // we assume for now that the ingress is exposed on localhost + + if self.max_scale_n > 1 { + // set autoscale + let scale_args = format!( + "autoscale deployment {} --max={}", + deployment_name, self.max_scale_n + ); + let scale_status = Command::new("kubectl") + .args(scale_args.as_str().split(' ')) + .status()?; + + if !scale_status.success() { + return Err(anyhow::anyhow!( + "autoscale error: error code: {:?}", + scale_status.code() + ) + .into()); + // ^ todo attach error context from child + } + } + + sleep(Duration::from_secs(90)).await; + // todo make sure that the pod and service were correctly started before returning + // todo implement the check on whether the service is running / pod failed + + self.fn_names_to_ips + .insert(function_name.to_string(), Url::from_str(&ingress_ip)?); + Ok(()) + } + + #[tracing::instrument] + async fn dispatch( + &mut self, + function_name: &str, + params: ArgsString, + ) -> DistributionResult { + // request from server + let service_base_url = self.fn_names_to_ips.get(function_name).unwrap(); + let args = format!("./{}", params); + let query_url = service_base_url.join(&args)?; + tracing::info!(url = query_url.as_str(), "sending dispatch request"); + Ok(self + .request_client + .get(query_url) + .send() + .compat() + .await? + .text() + .compat() + .await?) + } + + #[tracing::instrument] + fn has_declared(&self, fn_name: &str) -> bool { + self.fn_names_to_ips.contains_key(fn_name) + } +} + +lazy_static! { + static ref PORT_RE: Regex = Regex::new(r"0\.0\.0\.0:(\d+)->").unwrap(); +} + +#[tracing::instrument(skip(project_tar))] +fn make_image( + k8s: &K8s, + app_name: &str, + function_name: &str, + project_tar: &[u8], +) -> anyhow::Result { + // todo: we should add some random stuff to the function_name to avoid collisions and figure + // out when to overwrite vs not. + + tracing::info!("making image"); + // set up directory and dockerfile + let build_dir = CACHE_PATH.join(format!("{}_k8s_temp_dir", app_name).as_str()); + std::fs::create_dir_all(&build_dir)?; + let build_dir_canonical = build_dir.canonicalize()?; + let dockerfile_path = build_dir_canonical.join("Dockerfile"); + let tar_file_name = "source.tar"; + let tar_path = build_dir_canonical.join(tar_file_name); + let docker_file = format!( + "FROM ubuntu:latest as builder +# set timezone (otherwise tzinfo stops dep installation with prompt for time zone) +ENV TZ=Etc/UTC +RUN ln -snf /usr/share/zoneinfo/$TZ /etc/localtime && echo $TZ > /etc/timezone + +# install curl and rust deps +RUN apt-get update && apt-get install -y curl gcc libssl-dev pkg-config && rm -rf /var/lib/apt/lists/* + +# install rustup +RUN curl --proto '=https' --tlsv1.2 -sSf https://sh.rustup.rs | sh -s -- -y --default-toolchain nightly +ENV PATH=/root/.cargo/bin:$PATH + +# copy tar file +COPY {tar_file_name} {tar_file_name} + +# unpack tar +RUN cat {tar_file_name} | tar xvf - + +# enter into unpacked source directory +WORKDIR {function_name} + +# build and run according to compilation scheme +ENV RUSTFLAGS='--cfg procmacro2_semver_exempt' +{compilation_scheme}", + function_name=function_name, + tar_file_name=tar_file_name, + compilation_scheme={ + if let Some(architecture) = TARGET_ARCHITECTURE { + format!("# install the project binary with the given architecture. +RUN rustup target add {architecture} +RUN cargo install{debug_flag} --target {architecture} --path . + +# copy the binary from the builder, leaving the build environment. +FROM scratch +COPY --from=builder /usr/local/cargo/bin/{function_name} . +CMD [\"./{function_name}\", \"0.0.0.0:{container_port}\"]", + architecture=architecture, + debug_flag=DEBUG_FLAG, + function_name=function_name, + container_port=CONTAINER_PORT + ) + } else { + format!( + "RUN cargo build{release_flag} + CMD cargo run{release_flag} -- 0.0.0.0:{container_port}", + release_flag=RELEASE_FLAG, + container_port=CONTAINER_PORT + ) + } + } + ); + std::fs::write(&dockerfile_path, docker_file)?; + std::fs::write(&tar_path, project_tar)?; + let unique_tag = format!("{}:turbolift", app_name); + + let result = (|| { + // build image + let build_cmd = format!( + "build -t {} {}", + unique_tag, + build_dir_canonical.to_string_lossy() + ); + let build_status = Command::new("docker") + .args(build_cmd.as_str().split(' ')) + .status()?; + + // make sure that build completed successfully + if !build_status.success() { + return Err(anyhow::anyhow!("docker image build failure")); + } + + Ok(unique_tag.clone()) + })(); + // always remove the build directory, even on build error + std::fs::remove_dir_all(build_dir_canonical)?; + + result.and((k8s.deploy_container)(unique_tag.as_str()).map(|s| s.to_string())) +} + +impl Drop for K8s { + #[tracing::instrument] + fn drop(&mut self) { + let status = Command::new("kubectl") + .args( + format!( + "delete pods,deployments,services,ingress -l turbolift_run_id={}", + self.run_id.to_string() + ) + .split(' '), + ) + .status() + .expect("could not delete Kubernetes resources"); + if !status.success() { + eprintln!("could not delete Kubernetes resources") + } + } +} diff --git a/turbolift_internals/src/lib.rs b/turbolift_internals/src/lib.rs index 586cdb01..74b697f1 100644 --- a/turbolift_internals/src/lib.rs +++ b/turbolift_internals/src/lib.rs @@ -5,10 +5,11 @@ use std::path::Path; pub mod build_project; pub mod distributed_platform; pub mod extract_function; -pub mod utils; -pub mod local_queue; pub mod kubernetes; +pub mod local_queue; +pub mod utils; pub use serde_json; +pub use uuid; lazy_static! { /// CACHE_PATH is the directory where turbolift stores derived projects, diff --git a/turbolift_internals/src/local_queue.rs b/turbolift_internals/src/local_queue.rs index a3ac54e3..af0eeaf7 100644 --- a/turbolift_internals/src/local_queue.rs +++ b/turbolift_internals/src/local_queue.rs @@ -2,11 +2,11 @@ extern crate proc_macro; use std::collections::HashMap; use std::fs; use std::path::Path; -use std::thread::sleep; use std::time::Duration; use async_trait::async_trait; use std::process::{Child, Command}; +use tokio_compat_02::FutureExt; use url::Url; use crate::build_project::make_executable; @@ -15,15 +15,18 @@ use crate::distributed_platform::{ }; use crate::extract_function::decompress_proj_src; use crate::CACHE_PATH; +use uuid::Uuid; type AddressAndPort = Url; type FunctionName = String; -#[derive(Default)] +#[derive(Default, Debug)] pub struct LocalQueue { fn_name_to_address: HashMap, // todo hardcoded rn fn_name_to_process: HashMap, fn_name_to_binary_path: HashMap, + request_client: reqwest::Client, + run_id: Uuid, } impl LocalQueue { @@ -35,31 +38,33 @@ impl LocalQueue { #[async_trait] impl DistributionPlatform for LocalQueue { /// declare a function. Runs once. - fn declare(&mut self, function_name: &str, project_tar: &[u8]) { + #[tracing::instrument(skip(project_tar))] + async fn declare(&mut self, function_name: &str, project_tar: &[u8]) -> DistributionResult<()> { let relative_build_dir = Path::new(".") .join(".turbolift") .join(".worker_build_cache"); - fs::create_dir_all(&relative_build_dir).unwrap(); - let build_dir = relative_build_dir.canonicalize().unwrap(); + fs::create_dir_all(&relative_build_dir)?; + let build_dir = relative_build_dir.canonicalize()?; decompress_proj_src(project_tar, &build_dir).unwrap(); - let function_executable = - Path::new(CACHE_PATH.as_os_str()).join(function_name.to_string() + "_server"); - make_executable(&build_dir.join(function_name), Some(&function_executable)).unwrap(); + let function_executable = Path::new(CACHE_PATH.as_os_str()).join(format!( + "{}_{}_server", + function_name.to_string(), + self.run_id.as_u128() + )); + make_executable(&build_dir.join(function_name), Some(&function_executable))?; self.fn_name_to_binary_path .insert(function_name.to_string(), function_executable); //std::fs::remove_dir_all(build_dir.join(function_name)).unwrap(); todo + Ok(()) } // dispatch params to a function. Runs each time the function is called. + #[tracing::instrument] async fn dispatch( &mut self, function_name: &str, params: ArgsString, ) -> DistributionResult { - async fn get(query_url: Url) -> String { - surf::get(query_url).recv_string().await.unwrap() - } - let address_and_port = { if self.fn_name_to_address.contains_key(function_name) { // the server is already initialized. @@ -73,12 +78,14 @@ impl DistributionPlatform for LocalQueue { let server_url: AddressAndPort = Url::parse(&("http://".to_string() + server_address_and_port_str))?; let executable = self.fn_name_to_binary_path.get(function_name).unwrap(); + tracing::info!("spawning"); let server_handle = Command::new(executable) .arg(&server_address_and_port_str) .spawn()?; - sleep(Duration::from_secs(30)); + tracing::info!("delaying"); + tokio::time::sleep(Duration::from_secs(60)).await; + tracing::info!("delay completed"); // ^ sleep to make sure the server is initialized before continuing - // todo: here and with the GET request, futures hang indefinitely. To investigate. self.fn_name_to_address .insert(function_name.to_string(), server_url.clone()); self.fn_name_to_process @@ -88,16 +95,30 @@ impl DistributionPlatform for LocalQueue { }; // request from server - let prefixed_params = "./".to_string() + function_name + "/" + ¶ms; + let prefixed_params = "./".to_string() + function_name + "/empty-uuid/" + ¶ms; let query_url = address_and_port.join(&prefixed_params)?; - let response = async_std::task::block_on(get(query_url)); - // ^ todo not sure why futures are hanging here unless I wrap them in a new block_on? - Ok(response) + + tracing::info!("sending dispatch request"); + Ok(self + .request_client + .get(query_url) + .send() + .compat() + .await? + .text() + .compat() + .await?) + } + + #[tracing::instrument] + fn has_declared(&self, fn_name: &str) -> bool { + self.fn_name_to_binary_path.contains_key(fn_name) } } impl Drop for LocalQueue { /// terminate all servers when program is finished + #[tracing::instrument] fn drop(&mut self) { self.fn_name_to_process .drain() diff --git a/turbolift_internals/src/utils.rs b/turbolift_internals/src/utils.rs index 4abc4b86..85d49691 100644 --- a/turbolift_internals/src/utils.rs +++ b/turbolift_internals/src/utils.rs @@ -4,4 +4,25 @@ pub use std::os::unix::fs::symlink as symlink_dir; #[cfg(target_family = "windows")] pub use std::os::windows::fs::symlink_dir; +#[cfg(not(debug_assertions))] +pub const IS_RELEASE: bool = true; +#[cfg(debug_assertions)] +pub const IS_RELEASE: bool = false; + +/// is " --release" if built with release flag, otherwise empty string +pub const RELEASE_FLAG: &str = { + if IS_RELEASE { + " --release" + } else { + "" + } +}; + +pub const DEBUG_FLAG: &str = { + if IS_RELEASE { + "" + } else { + " --debug" + } +}; diff --git a/turbolift_macros/Cargo.toml b/turbolift_macros/Cargo.toml index aac937be..c4016918 100644 --- a/turbolift_macros/Cargo.toml +++ b/turbolift_macros/Cargo.toml @@ -9,9 +9,16 @@ edition = "2018" [lib] proc-macro = true +[features] +"distributed" = [] + [dependencies] quote = "1" proc-macro2 = "1" fs_extra = "1" turbolift_internals = { path="../turbolift_internals" } -futures = "0.3" \ No newline at end of file +futures = "0.3" +cached = "0.19" +tracing = {version="0.1", features=["attributes"]} +tracing-futures = "0.2.4" +syn = { version = "1", features=["full"] } diff --git a/turbolift_macros/src/lib.rs b/turbolift_macros/src/lib.rs index 1e9c8ad1..a931a337 100644 --- a/turbolift_macros/src/lib.rs +++ b/turbolift_macros/src/lib.rs @@ -1,11 +1,12 @@ use proc_macro::TokenStream; -use proc_macro2::TokenStream as TokenStream2; +use proc_macro2::{Ident, Span, TokenStream as TokenStream2}; use quote::quote as q; use turbolift_internals::extract_function; #[cfg(feature = "distributed")] #[proc_macro_attribute] +#[tracing::instrument] pub fn on(distribution_platform_: TokenStream, function_: TokenStream) -> TokenStream { use quote::{format_ident, ToTokens}; use std::fs; @@ -14,6 +15,8 @@ pub fn on(distribution_platform_: TokenStream, function_: TokenStream) -> TokenS use turbolift_internals::{build_project, CACHE_PATH}; + const RUN_ID_NAME: &str = "_turbolift_run_id"; + // convert proc_macro::TokenStream to proc_macro2::TokenStream let distribution_platform = TokenStream2::from(distribution_platform_); let function = TokenStream2::from(function_); @@ -29,10 +32,34 @@ pub fn on(distribution_platform_: TokenStream, function_: TokenStream) -> TokenS let function_name_string = function_name.to_string(); let typed_params = signature.inputs; let untyped_params = extract_function::to_untyped_params(typed_params.clone()); + let mut untyped_params_with_run_id = untyped_params.clone(); + // we need to prepend a variable for the run id, since it's in the URL. + untyped_params_with_run_id.insert( + 0, + Box::new(syn::Pat::Ident(syn::PatIdent { + attrs: vec![], + by_ref: None, + mutability: None, + ident: Ident::new(RUN_ID_NAME, Span::call_site()), + subpat: None, + })), + ); + let untyped_params_tokens_with_run_id = untyped_params_with_run_id.to_token_stream(); let untyped_params_tokens = untyped_params.to_token_stream(); let params_as_path = extract_function::to_path_params(untyped_params.clone()); - let wrapper_route = "/".to_string() + &original_target_function_name + "/" + ¶ms_as_path; - let param_types = extract_function::to_param_types(typed_params.clone()); + let wrapper_route = format!( + "{}/{{{}}}/{}", + original_target_function_name, RUN_ID_NAME, ¶ms_as_path + ); + let mut param_types = extract_function::to_param_types(typed_params.clone()); + // we need to prepend a type for the run id added to the wrapper route + param_types.insert( + 0, + Box::new(syn::Type::Verbatim( + str::parse::("String") + .expect("could not parse \"String\" as a tokenstream"), + )), + ); let params_vec = extract_function::params_json_vec(untyped_params.clone()); let result_type = extract_function::get_result_type(&signature.output); let dummy_function = extract_function::make_dummy_function( @@ -47,14 +74,16 @@ pub fn on(distribution_platform_: TokenStream, function_: TokenStream) -> TokenS let sanitized_file = extract_function::get_sanitized_file(&function); // todo make code below hygienic in case sanitized_file also imports from actix_web let main_file = q! { - use turbolift::actix_web::{self, get, web, HttpResponse, Result}; + use turbolift::actix_web::{self, get, web, HttpResponse, HttpRequest, Result, Responder}; + use turbolift::tokio_compat_02::FutureExt; #sanitized_file #dummy_function #target_function #[get(#wrapper_route)] - async fn turbolift_wrapper(web::Path((#untyped_params_tokens)): web::Path<(#param_types)>) -> Result { + #[turbolift::tracing::instrument] + async fn turbolift_wrapper(web::Path((#untyped_params_tokens_with_run_id)): web::Path<(#param_types)>) -> Result { Ok( HttpResponse::Ok() .json(#function_name(#untyped_params_tokens)) @@ -62,20 +91,32 @@ pub fn on(distribution_platform_: TokenStream, function_: TokenStream) -> TokenS } #[actix_web::main] + #[turbolift::tracing::instrument] async fn main() -> std::io::Result<()> { - use actix_web::{App, HttpServer}; + use actix_web::{App, HttpServer, HttpRequest, web}; let args: Vec = std::env::args().collect(); let ip_and_port = &args[1]; + turbolift::tracing::info!("service main() started. ip_and_port parsed."); HttpServer::new( || App::new() .service( turbolift_wrapper ) + .default_service( + web::get() + .to( + |req: HttpRequest| + HttpResponse::NotFound().body( + format!("endpoint not found: {}", req.uri()) + ) + ) + ) ) .bind(ip_and_port)? .run() + .compat() .await } }; @@ -88,9 +129,10 @@ pub fn on(distribution_platform_: TokenStream, function_: TokenStream) -> TokenS .map(|res| res.expect("could not read entry").path()) .filter(|path| path.file_name() != CACHE_PATH.file_name()) .filter( - |path| path.to_str() != Some("./target"), // todo we could shorten compile time by sharing deps in ./target, - // but I didn't have the bandwidth to debug permissions errors caused - // by copying all of the compiled lib files. + |path| path.to_str() != Some("./target"), + // todo we could shorten compile time by sharing deps in ./target, + // but I didn't have the bandwidth to debug permissions errors caused + // by copying all of the compiled lib files. ) .collect(); fs_extra::copy_items( @@ -109,25 +151,33 @@ pub fn on(distribution_platform_: TokenStream, function_: TokenStream) -> TokenS // modify cargo.toml (edit package info & add actix + json_serde deps) build_project::edit_cargo_file( + PathBuf::from_str(".") + .expect("could not find project dir") + .canonicalize() + .expect("could not canonicalize path to project dir") + .as_path(), &function_cache_proj_path.join("Cargo.toml"), &original_target_function_name, ) .expect("error editing cargo file"); // lint project - build_project::lint(&function_cache_proj_path).expect("linting error"); + if let Err(e) = build_project::lint(&function_cache_proj_path) { + tracing::error!( + error = e.as_ref() as &(dyn std::error::Error + 'static), + "ignoring linting error" + ); + } - // check project and give errors - build_project::check(&function_cache_proj_path).expect("error checking function"); + // // check project and give errors + // build_project::check(&function_cache_proj_path).expect("error checking function"); + // println!("building microservice"); // // build project so that the deps are packaged, and if the worker has the same architecture, - // // they can directly use the compiled version without having to recompile. todo commented - // // out because the build artifacts are too large. - // build_project::make_executable( - // &function_cache_proj_path, - // None - // ).expect("error building function"); - // ^ todo + // // they can directly use the compiled version without having to recompile. todo the build artifacts are too large. + // build_project::make_executable(&function_cache_proj_path, None) + // .expect("error building function"); + // // ^ todo // compress project source files let project_source_binary = { @@ -150,31 +200,32 @@ pub fn on(distribution_platform_: TokenStream, function_: TokenStream) -> TokenS extern crate turbolift; // dispatch call and process response + #[turbolift::tracing::instrument] async fn #original_target_function_ident(#typed_params) -> turbolift::DistributionResult<#result_type> { use std::time::Duration; - use turbolift::DistributionPlatform; - use turbolift::async_std::task; - use turbolift::cached::proc_macro::cached; - - // call .declare once by memoizing the call - #[cached(size=1)] - fn setup() { - #distribution_platform - .lock() - .unwrap() - .declare(#original_target_function_name, #project_source_binary); + use turbolift::distributed_platform::DistributionPlatform; + use turbolift::DistributionResult; + use turbolift::tokio_compat_02::FutureExt; + use turbolift::uuid::Uuid; + + let mut platform = #distribution_platform.lock().await; + + if !platform.has_declared(#original_target_function_name) { + platform + .declare(#original_target_function_name, #project_source_binary) + .compat() + .await?; } - setup(); let params = #params_vec.join("/"); - - let resp_string = #distribution_platform - .lock()? + let resp_string = platform .dispatch( #original_target_function_name, params.to_string() - ).await?; + ) + .compat() + .await?; Ok(turbolift::serde_json::from_str(&resp_string)?) } }; @@ -184,8 +235,6 @@ pub fn on(distribution_platform_: TokenStream, function_: TokenStream) -> TokenS #[cfg(not(feature = "distributed"))] #[proc_macro_attribute] pub fn on(_distribution_platform: TokenStream, function_: TokenStream) -> TokenStream { - use proc_macro2::{Ident, Span}; - // convert proc_macro::TokenStream to proc_macro2::TokenStream let function = TokenStream2::from(function_); let mut wrapped_original_function = extract_function::get_fn_item(function); @@ -199,6 +248,7 @@ pub fn on(_distribution_platform: TokenStream, function_: TokenStream) -> TokenS let async_function = q! { extern crate turbolift; + #[turbolift::tracing::instrument] async fn #original_target_function_ident(#typed_params) -> turbolift::DistributionResult<#output_type> { #wrapped_original_function Ok(wrapped_function(#untyped_params))