From b19b85a582853b62501b9b73113bfdf95af85c1a Mon Sep 17 00:00:00 2001 From: Alejandro Martinez Ruiz Date: Thu, 29 Oct 2020 17:22:29 +0100 Subject: [PATCH] types: replace Action to mimic C++ SDK and add missing variants This is based on PR #39 upstream. Brings us a step closer to what the C++ SDK uses. Signed-off-by: Alejandro Martinez Ruiz --- examples/http_auth_random.rs | 8 +++--- examples/http_body.rs | 10 +++---- examples/http_headers.rs | 10 +++---- src/dispatcher.rs | 36 +++++++++++------------ src/traits.rs | 36 +++++++++++------------ src/types.rs | 56 ++++++++++++++++++++++++++++++++++-- 6 files changed, 104 insertions(+), 52 deletions(-) diff --git a/examples/http_auth_random.rs b/examples/http_auth_random.rs index b9e747c6..dbe9dc0f 100644 --- a/examples/http_auth_random.rs +++ b/examples/http_auth_random.rs @@ -26,7 +26,7 @@ pub fn _start() { struct HttpAuthRandom; impl HttpContext for HttpAuthRandom { - fn on_http_request_headers(&mut self, _: usize) -> Action { + fn on_http_request_headers(&mut self, _: usize) -> FilterHeadersStatus { self.dispatch_http_call( "httpbin", vec![ @@ -39,12 +39,12 @@ impl HttpContext for HttpAuthRandom { Duration::from_secs(5), ) .unwrap(); - Action::Pause + FilterHeadersStatus::StopIteration } - fn on_http_response_headers(&mut self, _: usize) -> Action { + fn on_http_response_headers(&mut self, _: usize) -> FilterHeadersStatus { self.set_http_response_header("Powered-By", Some("proxy-wasm")); - Action::Continue + FilterHeadersStatus::Continue } } diff --git a/examples/http_body.rs b/examples/http_body.rs index 5db8ed8f..333dbf77 100644 --- a/examples/http_body.rs +++ b/examples/http_body.rs @@ -26,20 +26,20 @@ struct HttpBody; impl Context for HttpBody {} impl HttpContext for HttpBody { - fn on_http_response_headers(&mut self, _: usize) -> Action { + fn on_http_response_headers(&mut self, _: usize) -> FilterHeadersStatus { // If there is a Content-Length header and we change the length of // the body later, then clients will break. So remove it. // We must do this here, because once we exit this function we // can no longer modify the response headers. self.set_http_response_header("content-length", None); - Action::Continue + FilterHeadersStatus::Continue } - fn on_http_response_body(&mut self, body_size: usize, end_of_stream: bool) -> Action { + fn on_http_response_body(&mut self, body_size: usize, end_of_stream: bool) -> FilterDataStatus { if !end_of_stream { // Wait -- we'll be called again when the complete body is buffered // at the host side. - return Action::Pause; + return FilterDataStatus::StopIterationAndBuffer; } // Replace the message body if it contains the text "secret". @@ -51,6 +51,6 @@ impl HttpContext for HttpBody { self.set_http_response_body(0, body_size, &new_body.into_bytes()); } } - Action::Continue + FilterDataStatus::Continue } } diff --git a/examples/http_headers.rs b/examples/http_headers.rs index 14b32861..ee4d0f13 100644 --- a/examples/http_headers.rs +++ b/examples/http_headers.rs @@ -31,7 +31,7 @@ struct HttpHeaders { impl Context for HttpHeaders {} impl HttpContext for HttpHeaders { - fn on_http_request_headers(&mut self, _: usize) -> Action { + fn on_http_request_headers(&mut self, _: usize) -> FilterHeadersStatus { for (name, value) in &self.get_http_request_headers() { trace!("#{} -> {}: {}", self.context_id, name, value); } @@ -43,17 +43,17 @@ impl HttpContext for HttpHeaders { vec![("Hello", "World"), ("Powered-By", "proxy-wasm")], Some(b"Hello, World!\n"), ); - Action::Pause + FilterHeadersStatus::StopIteration } - _ => Action::Continue, + _ => FilterHeadersStatus::Continue, } } - fn on_http_response_headers(&mut self, _: usize) -> Action { + fn on_http_response_headers(&mut self, _: usize) -> FilterHeadersStatus { for (name, value) in &self.get_http_response_headers() { trace!("#{} <- {}: {}", self.context_id, name, value); } - Action::Continue + FilterHeadersStatus::Continue } fn on_log(&mut self) { diff --git a/src/dispatcher.rs b/src/dispatcher.rs index b121e1c0..522acadd 100644 --- a/src/dispatcher.rs +++ b/src/dispatcher.rs @@ -262,7 +262,7 @@ impl Dispatcher { } } - fn on_new_connection(&self, context_id: u32) -> Action { + fn on_new_connection(&self, context_id: u32) -> FilterStatus { if let Some(stream) = self.streams.borrow_mut().get_mut(&context_id) { self.active_id.set(context_id); stream.on_new_connection() @@ -271,7 +271,7 @@ impl Dispatcher { } } - fn on_downstream_data(&self, context_id: u32, data_size: usize, end_of_stream: bool) -> Action { + fn on_downstream_data(&self, context_id: u32, data_size: usize, end_of_stream: bool) -> FilterStatus { if let Some(stream) = self.streams.borrow_mut().get_mut(&context_id) { self.active_id.set(context_id); stream.on_downstream_data(data_size, end_of_stream) @@ -289,7 +289,7 @@ impl Dispatcher { } } - fn on_upstream_data(&self, context_id: u32, data_size: usize, end_of_stream: bool) -> Action { + fn on_upstream_data(&self, context_id: u32, data_size: usize, end_of_stream: bool) -> FilterStatus { if let Some(stream) = self.streams.borrow_mut().get_mut(&context_id) { self.active_id.set(context_id); stream.on_upstream_data(data_size, end_of_stream) @@ -307,7 +307,7 @@ impl Dispatcher { } } - fn on_http_request_headers(&self, context_id: u32, num_headers: usize) -> Action { + fn on_http_request_headers(&self, context_id: u32, num_headers: usize) -> FilterHeadersStatus { if let Some(http_stream) = self.http_streams.borrow_mut().get_mut(&context_id) { self.active_id.set(context_id); http_stream.on_http_request_headers(num_headers) @@ -321,7 +321,7 @@ impl Dispatcher { context_id: u32, body_size: usize, end_of_stream: bool, - ) -> Action { + ) -> FilterDataStatus { if let Some(http_stream) = self.http_streams.borrow_mut().get_mut(&context_id) { self.active_id.set(context_id); http_stream.on_http_request_body(body_size, end_of_stream) @@ -330,7 +330,7 @@ impl Dispatcher { } } - fn on_http_request_trailers(&self, context_id: u32, num_trailers: usize) -> Action { + fn on_http_request_trailers(&self, context_id: u32, num_trailers: usize) -> FilterTrailersStatus { if let Some(http_stream) = self.http_streams.borrow_mut().get_mut(&context_id) { self.active_id.set(context_id); http_stream.on_http_request_trailers(num_trailers) @@ -339,7 +339,7 @@ impl Dispatcher { } } - fn on_http_response_headers(&self, context_id: u32, num_headers: usize) -> Action { + fn on_http_response_headers(&self, context_id: u32, num_headers: usize) -> FilterHeadersStatus { if let Some(http_stream) = self.http_streams.borrow_mut().get_mut(&context_id) { self.active_id.set(context_id); http_stream.on_http_response_headers(num_headers) @@ -353,7 +353,7 @@ impl Dispatcher { context_id: u32, body_size: usize, end_of_stream: bool, - ) -> Action { + ) -> FilterDataStatus { if let Some(http_stream) = self.http_streams.borrow_mut().get_mut(&context_id) { self.active_id.set(context_id); http_stream.on_http_response_body(body_size, end_of_stream) @@ -362,7 +362,7 @@ impl Dispatcher { } } - fn on_http_response_trailers(&self, context_id: u32, num_trailers: usize) -> Action { + fn on_http_response_trailers(&self, context_id: u32, num_trailers: usize) -> FilterTrailersStatus { if let Some(http_stream) = self.http_streams.borrow_mut().get_mut(&context_id) { self.active_id.set(context_id); http_stream.on_http_response_trailers(num_trailers) @@ -439,7 +439,7 @@ pub extern "C" fn proxy_on_queue_ready(context_id: u32, queue_id: u32) { } #[no_mangle] -pub extern "C" fn proxy_on_new_connection(context_id: u32) -> Action { +pub extern "C" fn proxy_on_new_connection(context_id: u32) -> FilterStatus { DISPATCHER.with(|dispatcher| dispatcher.on_new_connection(context_id)) } @@ -448,7 +448,7 @@ pub extern "C" fn proxy_on_downstream_data( context_id: u32, data_size: usize, end_of_stream: bool, -) -> Action { +) -> FilterStatus { DISPATCHER .with(|dispatcher| dispatcher.on_downstream_data(context_id, data_size, end_of_stream)) } @@ -463,7 +463,7 @@ pub extern "C" fn proxy_on_upstream_data( context_id: u32, data_size: usize, end_of_stream: bool, -) -> Action { +) -> FilterStatus { DISPATCHER.with(|dispatcher| dispatcher.on_upstream_data(context_id, data_size, end_of_stream)) } @@ -473,7 +473,7 @@ pub extern "C" fn proxy_on_upstream_connection_close(context_id: u32, peer_type: } #[no_mangle] -pub extern "C" fn proxy_on_request_headers(context_id: u32, num_headers: usize) -> Action { +pub extern "C" fn proxy_on_request_headers(context_id: u32, num_headers: usize) -> FilterHeadersStatus { DISPATCHER.with(|dispatcher| dispatcher.on_http_request_headers(context_id, num_headers)) } @@ -482,18 +482,18 @@ pub extern "C" fn proxy_on_request_body( context_id: u32, body_size: usize, end_of_stream: bool, -) -> Action { +) -> FilterDataStatus { DISPATCHER .with(|dispatcher| dispatcher.on_http_request_body(context_id, body_size, end_of_stream)) } #[no_mangle] -pub extern "C" fn proxy_on_request_trailers(context_id: u32, num_trailers: usize) -> Action { +pub extern "C" fn proxy_on_request_trailers(context_id: u32, num_trailers: usize) -> FilterTrailersStatus { DISPATCHER.with(|dispatcher| dispatcher.on_http_request_trailers(context_id, num_trailers)) } #[no_mangle] -pub extern "C" fn proxy_on_response_headers(context_id: u32, num_headers: usize) -> Action { +pub extern "C" fn proxy_on_response_headers(context_id: u32, num_headers: usize) -> FilterHeadersStatus { DISPATCHER.with(|dispatcher| dispatcher.on_http_response_headers(context_id, num_headers)) } @@ -502,13 +502,13 @@ pub extern "C" fn proxy_on_response_body( context_id: u32, body_size: usize, end_of_stream: bool, -) -> Action { +) -> FilterDataStatus { DISPATCHER .with(|dispatcher| dispatcher.on_http_response_body(context_id, body_size, end_of_stream)) } #[no_mangle] -pub extern "C" fn proxy_on_response_trailers(context_id: u32, num_trailers: usize) -> Action { +pub extern "C" fn proxy_on_response_trailers(context_id: u32, num_trailers: usize) -> FilterTrailersStatus { DISPATCHER.with(|dispatcher| dispatcher.on_http_response_trailers(context_id, num_trailers)) } diff --git a/src/traits.rs b/src/traits.rs index 57f2fd3c..2f5bbbfb 100644 --- a/src/traits.rs +++ b/src/traits.rs @@ -136,12 +136,12 @@ pub trait RootContext: Context { } pub trait StreamContext: Context { - fn on_new_connection(&mut self) -> Action { - Action::Continue + fn on_new_connection(&mut self) -> FilterStatus { + FilterStatus::Continue } - fn on_downstream_data(&mut self, _data_size: usize, _end_of_stream: bool) -> Action { - Action::Continue + fn on_downstream_data(&mut self, _data_size: usize, _end_of_stream: bool) -> FilterStatus { + FilterStatus::Continue } fn get_downstream_data(&self, start: usize, max_size: usize) -> Option { @@ -154,8 +154,8 @@ pub trait StreamContext: Context { fn on_downstream_close(&mut self, _peer_type: PeerType) {} - fn on_upstream_data(&mut self, _data_size: usize, _end_of_stream: bool) -> Action { - Action::Continue + fn on_upstream_data(&mut self, _data_size: usize, _end_of_stream: bool) -> FilterStatus { + FilterStatus::Continue } fn get_upstream_data(&self, start: usize, max_size: usize) -> Option { @@ -172,8 +172,8 @@ pub trait StreamContext: Context { } pub trait HttpContext: Context { - fn on_http_request_headers(&mut self, _num_headers: usize) -> Action { - Action::Continue + fn on_http_request_headers(&mut self, _num_headers: usize) -> FilterHeadersStatus { + FilterHeadersStatus::Continue } fn get_http_request_headers(&self) -> Vec<(String, String)> { @@ -196,8 +196,8 @@ pub trait HttpContext: Context { hostcalls::add_map_value(MapType::HttpRequestHeaders, &name, value).unwrap() } - fn on_http_request_body(&mut self, _body_size: usize, _end_of_stream: bool) -> Action { - Action::Continue + fn on_http_request_body(&mut self, _body_size: usize, _end_of_stream: bool) -> FilterDataStatus { + FilterDataStatus::Continue } fn get_http_request_body(&self, start: usize, max_size: usize) -> Option { @@ -208,8 +208,8 @@ pub trait HttpContext: Context { hostcalls::set_buffer(BufferType::HttpRequestBody, start, size, value).unwrap() } - fn on_http_request_trailers(&mut self, _num_trailers: usize) -> Action { - Action::Continue + fn on_http_request_trailers(&mut self, _num_trailers: usize) -> FilterTrailersStatus { + FilterTrailersStatus::Continue } fn get_http_request_trailers(&self) -> Vec<(String, String)> { @@ -236,8 +236,8 @@ pub trait HttpContext: Context { hostcalls::resume_http_request().unwrap() } - fn on_http_response_headers(&mut self, _num_headers: usize) -> Action { - Action::Continue + fn on_http_response_headers(&mut self, _num_headers: usize) -> FilterHeadersStatus { + FilterHeadersStatus::Continue } fn get_http_response_headers(&self) -> Vec<(String, String)> { @@ -260,8 +260,8 @@ pub trait HttpContext: Context { hostcalls::add_map_value(MapType::HttpResponseHeaders, &name, value).unwrap() } - fn on_http_response_body(&mut self, _body_size: usize, _end_of_stream: bool) -> Action { - Action::Continue + fn on_http_response_body(&mut self, _body_size: usize, _end_of_stream: bool) -> FilterDataStatus { + FilterDataStatus::Continue } fn get_http_response_body(&self, start: usize, max_size: usize) -> Option { @@ -272,8 +272,8 @@ pub trait HttpContext: Context { hostcalls::set_buffer(BufferType::HttpResponseBody, start, size, value).unwrap() } - fn on_http_response_trailers(&mut self, _num_trailers: usize) -> Action { - Action::Continue + fn on_http_response_trailers(&mut self, _num_trailers: usize) -> FilterTrailersStatus { + FilterTrailersStatus::Continue } fn get_http_response_trailers(&self) -> Vec<(String, String)> { diff --git a/src/types.rs b/src/types.rs index a951f78f..8d170d44 100644 --- a/src/types.rs +++ b/src/types.rs @@ -31,9 +31,41 @@ pub enum LogLevel { #[repr(u32)] #[derive(Debug)] -pub enum Action { +pub enum FilterStatus { Continue = 0, - Pause = 1, + StopIteration = 1, +} + +#[repr(u32)] +#[derive(Debug)] +pub enum FilterHeadersStatus { + Continue = 0, + StopIteration = 1, + ContinueAndEndStream = 2, + StopAllIterationAndBuffer = 3, + StopAllIterationAndWatermark = 4, +} + +#[repr(u32)] +#[derive(Debug)] +pub enum FilterMetadataStatus { + Continue = 0, +} + +#[repr(u32)] +#[derive(Debug)] +pub enum FilterTrailersStatus { + Continue = 0, + StopIteration = 1, +} + +#[repr(u32)] +#[derive(Debug)] +pub enum FilterDataStatus { + Continue = 0, + StopIterationAndBuffer = 1, + StopIterationAndWatermark = 2, + StopIterationNoBuffer = 3, } #[repr(u32)] @@ -42,9 +74,16 @@ pub enum Status { Ok = 0, NotFound = 1, BadArgument = 2, + SerializationFailure = 3, + ParseFailure = 4, + BadExpression = 5, + InvalidMemoryAccess = 6, Empty = 7, CasMismatch = 8, + ResultMismatch = 9, InternalFailure = 10, + BrokenConnection = 11, + Unimplemented = 12, } #[repr(u32)] @@ -55,6 +94,10 @@ pub enum BufferType { DownstreamData = 2, UpstreamData = 3, HttpCallResponseBody = 4, + GrpcReceiveBuffer = 5, + VmConfiguration = 6, + PluginConfiguration = 7, + CallData = 8, } #[repr(u32)] @@ -64,6 +107,8 @@ pub enum MapType { HttpRequestTrailers = 1, HttpResponseHeaders = 2, HttpResponseTrailers = 3, + GrpcReceiveInitialMetadata = 4, + GrpcReceiveTrailingMetadata = 5, HttpCallResponseHeaders = 6, HttpCallResponseTrailers = 7, } @@ -85,3 +130,10 @@ pub enum MetricType { } pub type Bytes = Vec; + +#[repr(u32)] +#[derive(Debug)] +pub enum StreamType { + Request = 0, + Response = 1, +}