Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support multi trigger apps #53

Merged
merged 3 commits into from
Apr 15, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
155 changes: 90 additions & 65 deletions containerd-shim-spin/src/engine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ use containerd_shim_wasm::{
container::{Engine, RuntimeContext, Stdio},
sandbox::WasmLayer,
};
use futures::future;
use log::info;
use oci_spec::image::MediaType;
use spin_app::locked::LockedApp;
Expand Down Expand Up @@ -189,70 +190,91 @@ impl SpinEngine {
env::set_var("XDG_CACHE_HOME", &cache_dir);
let app_source = self.app_source(ctx, &cache).await?;
let resolved_app_source = self.resolve_app_source(app_source.clone(), &cache).await?;
let trigger_cmd = trigger_command_for_resolved_app_source(&resolved_app_source)
let trigger_cmds = trigger_command_for_resolved_app_source(&resolved_app_source)
.with_context(|| format!("Couldn't find trigger executor for {app_source:?}"))?;
let locked_app = self.load_resolved_app_source(resolved_app_source).await?;
self.run_trigger(ctx, &trigger_cmd, locked_app, app_source)
.await
self.run_trigger(
ctx,
trigger_cmds.iter().map(|s| s.as_ref()).collect(),
locked_app,
app_source,
)
.await
}

async fn run_trigger(
rajatjindal marked this conversation as resolved.
Show resolved Hide resolved
&self,
ctx: &impl RuntimeContext,
trigger_type: &str,
trigger_types: Vec<&str>,
app: LockedApp,
app_source: AppSource,
) -> Result<()> {
let working_dir = PathBuf::from("/");
let f = match trigger_type {
HttpTrigger::TRIGGER_TYPE => {
let http_trigger: HttpTrigger = self
.build_spin_trigger(working_dir, app, app_source)
.await
.context("failed to build spin trigger")?;

info!(" >>> running spin trigger");
http_trigger.run(spin_trigger_http::CliArgs {
address: parse_addr(SPIN_ADDR).unwrap(),
tls_cert: None,
tls_key: None,
})
}
RedisTrigger::TRIGGER_TYPE => {
let redis_trigger: RedisTrigger = self
.build_spin_trigger(working_dir, app, app_source)
.await
.context("failed to build spin trigger")?;

info!(" >>> running spin trigger");
redis_trigger.run(spin_trigger::cli::NoArgs)
}
SqsTrigger::TRIGGER_TYPE => {
let sqs_trigger: SqsTrigger = self
.build_spin_trigger(working_dir, app, app_source)
.await
.context("failed to build spin trigger")?;

info!(" >>> running spin trigger");
sqs_trigger.run(spin_trigger::cli::NoArgs)
}
CommandTrigger::TRIGGER_TYPE => {
let command_trigger: CommandTrigger = self
.build_spin_trigger(working_dir, app, app_source)
.await
.context("failed to build spin trigger")?;

info!(" >>> running spin trigger");
command_trigger.run(trigger_command::CliArgs {
guest_args: ctx.args().to_vec(),
})
}
_ => {
todo!("Only Http, Redis and SQS triggers are currently supported.")
}
};
let mut futures_list = Vec::with_capacity(trigger_types.len());
for trigger_type in trigger_types.iter() {
let f = match trigger_type.to_owned() {
HttpTrigger::TRIGGER_TYPE => {
let http_trigger: HttpTrigger = self
.build_spin_trigger(working_dir.clone(), app.clone(), app_source.clone())
.await
.context("failed to build spin trigger")?;

info!(" >>> running spin http trigger");
http_trigger.run(spin_trigger_http::CliArgs {
address: parse_addr(SPIN_ADDR).unwrap(),
tls_cert: None,
tls_key: None,
})
}
RedisTrigger::TRIGGER_TYPE => {
let redis_trigger: RedisTrigger = self
.build_spin_trigger(working_dir.clone(), app.clone(), app_source.clone())
.await
.context("failed to build spin trigger")?;

info!(" >>> running spin redis trigger");
redis_trigger.run(spin_trigger::cli::NoArgs)
}
SqsTrigger::TRIGGER_TYPE => {
let sqs_trigger: SqsTrigger = self
.build_spin_trigger(working_dir.clone(), app.clone(), app_source.clone())
.await
.context("failed to build spin trigger")?;

info!(" >>> running spin trigger");
sqs_trigger.run(spin_trigger::cli::NoArgs)
}
CommandTrigger::TRIGGER_TYPE => {
let command_trigger: CommandTrigger = self
.build_spin_trigger(working_dir.clone(), app.clone(), app_source.clone())
.await
.context("failed to build spin trigger")?;

info!(" >>> running spin trigger");
command_trigger.run(trigger_command::CliArgs {
guest_args: ctx.args().to_vec(),
})
}
_ => {
todo!("Only Http, Redis and SQS triggers are currently supported.")
}
};

futures_list.push(f)
}

info!(" >>> notifying main thread we are about to start");
f.await

// exit as soon as any of the trigger completes/exits
let (result, index, rest) = future::select_all(futures_list).await;
info!(
" >>> trigger type '{trigger_type}' exited",
trigger_type = trigger_types[index]
);

drop(rest);

result
}

async fn load_resolved_app_source(
Expand Down Expand Up @@ -435,7 +457,7 @@ pub enum ResolvedAppSource {
}

impl ResolvedAppSource {
pub fn trigger_type(&self) -> anyhow::Result<&str> {
pub fn trigger_types(&self) -> anyhow::Result<Vec<&str>> {
let types = match self {
ResolvedAppSource::File { manifest, .. } => {
manifest.triggers.keys().collect::<HashSet<_>>()
Expand All @@ -448,23 +470,26 @@ impl ResolvedAppSource {
};

ensure!(!types.is_empty(), "no triggers in app");
ensure!(types.len() == 1, "multiple trigger types not yet supported");
Ok(types.into_iter().next().unwrap())
Ok(types.into_iter().map(|t| t.as_str()).collect())
}
}

fn trigger_command_for_resolved_app_source(resolved: &ResolvedAppSource) -> Result<String> {
let trigger_type = resolved.trigger_type()?;

match trigger_type {
RedisTrigger::TRIGGER_TYPE
| HttpTrigger::TRIGGER_TYPE
| SqsTrigger::TRIGGER_TYPE
| CommandTrigger::TRIGGER_TYPE => Ok(trigger_type.to_owned()),
_ => {
todo!("Only Http, Redis, SQS, and command triggers are currently supported.")
fn trigger_command_for_resolved_app_source(resolved: &ResolvedAppSource) -> Result<Vec<String>> {
let trigger_types = resolved.trigger_types()?;
let mut types = Vec::with_capacity(trigger_types.len());
for trigger_type in trigger_types.iter() {
match trigger_type.to_owned() {
RedisTrigger::TRIGGER_TYPE
| HttpTrigger::TRIGGER_TYPE
| SqsTrigger::TRIGGER_TYPE
| CommandTrigger::TRIGGER_TYPE => types.push(trigger_type),
_ => {
todo!("Only Http, Redis and SQS triggers are currently supported.")
}
}
}

Ok(trigger_types.iter().map(|x| x.to_string()).collect())
}

#[cfg(test)]
Expand Down
15 changes: 15 additions & 0 deletions images/spin-multi-trigger-app/Dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
FROM --platform=${BUILDPLATFORM} rust:1.73 AS build
rajatjindal marked this conversation as resolved.
Show resolved Hide resolved
WORKDIR /opt/build
COPY . .
RUN rustup target add wasm32-wasi

WORKDIR /opt/build/spin-http-trigger
RUN cargo build --target wasm32-wasi --release

WORKDIR /opt/build/spin-redis-trigger
RUN cargo build --target wasm32-wasi --release

FROM scratch
COPY --from=build /opt/build/spin-http-trigger/target/wasm32-wasi/release/spin_http_trigger.wasm .
COPY --from=build /opt/build/spin-redis-trigger/target/wasm32-wasi/release/spin_redis_trigger.wasm .
COPY --from=build /opt/build/spin.toml .
2 changes: 2 additions & 0 deletions images/spin-multi-trigger-app/spin-http-trigger/.gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
target/
.spin/
Loading