diff --git a/.github/workflows/cibuild.yml b/.github/workflows/cibuild.yml index 0a381d8e9..5f8f48669 100644 --- a/.github/workflows/cibuild.yml +++ b/.github/workflows/cibuild.yml @@ -42,10 +42,10 @@ jobs: path: target key: ${{ runner.os }}-cargo-build-target-${{ hashFiles('**/Cargo.toml') }} - - name: Install stable toolchain + - name: Install nightly toolchain uses: actions-rs/toolchain@v1 with: - toolchain: stable + toolchain: nightly override: true components: rustfmt, clippy @@ -85,18 +85,17 @@ jobs: path: target key: ${{ runner.os }}-cargo-build-target-${{ hashFiles('**/Cargo.toml') }} - - name: Install stable toolchain + - name: Install nightly toolchain uses: actions-rs/toolchain@v1 with: - toolchain: stable + toolchain: nightly override: true components: rustfmt, clippy - - name: Run cargo test uses: actions-rs/cargo@v1 with: command: test - args: --workspace + args: -p pravega-rust-client-channel -p pravega-controller-client -p pravega-rust-client-integration-test -p pravega-rust-client-retry -p pravega-rust-client-shared -p pravega-wire-protocol -p pravega-client-rust - name: Run code cov run: | @@ -131,10 +130,10 @@ jobs: path: target key: ${{ runner.os }}-cargo-build-target-${{ hashFiles('**/Cargo.toml') }} - - name: Install stable toolchain + - name: Install nightly toolchain uses: actions-rs/toolchain@v1 with: - toolchain: stable + toolchain: nightly override: true components: rustfmt, clippy @@ -174,10 +173,10 @@ jobs: path: target key: ${{ runner.os }}-cargo-build-target-${{ hashFiles('**/Cargo.toml') }} - - name: Install stable toolchain + - name: Install nightly toolchain uses: actions-rs/toolchain@v1 with: - toolchain: stable + toolchain: nightly override: true components: rustfmt, clippy diff --git a/.gitignore b/.gitignore index 855e9bdfb..b1f2f77ae 100644 --- a/.gitignore +++ b/.gitignore @@ -19,3 +19,6 @@ pravega-*.tgz .vscode /shared/target/ .gradle +*.log + +__pycache__/ diff --git a/Cargo.toml b/Cargo.toml index 687fba448..9d3119806 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -4,7 +4,7 @@ version = "0.1.0" edition = "2018" categories = ["Network programming"] keywords = ["streaming", "client", "pravega"] -readme = "Readme.md" +readme = "README.md" repository = "https://github.com/pravega/pravega-client-rust" license = "Apache-2.0" description = "A Rust client for Pravega. (Pravega.io)" @@ -15,7 +15,7 @@ authors = ["Tom Kaitchuck ", "Wenqi Mou EventStreamWriter { let scope_name = Scope::new("testWriterPerf".into()); let stream_name = Stream::new("testWriterPerf".into()); - let client_factory = ClientFactory::new(config.clone()); + let client_factory = ClientFactory::new(config.clone()).await; let controller_client = client_factory.get_controller_client(); create_scope_stream(controller_client, &scope_name, &stream_name, 1).await; let scoped_stream = ScopedStream { diff --git a/bindings/Cargo.toml b/bindings/Cargo.toml new file mode 100644 index 000000000..16f499b80 --- /dev/null +++ b/bindings/Cargo.toml @@ -0,0 +1,44 @@ +[package] +name = "pravega-rust-client-bindings" +version = "0.1.0" +edition = "2018" +categories = ["Network programming"] +keywords = ["streaming", "client", "pravega"] +readme = "Readme.md" +repository = "https://github.com/pravega/pravega-client-rust" +license = "Apache-2.0" +description = "An internal library used by the Rust client for Pravega to generated language bindings for Python and WASM." +authors = ["Tom Kaitchuck ", "Wenqi Mou ", + "Sandeep Shridhar ", "Wenxiao Zhang "] + +# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html + +[lib] +name = "pravega_client" +crate-type = ["cdylib"] + +[features] +default = ["python_binding"] +javascript_binding = ["wasm-bindgen"] +python_binding = ["pyo3"] + +#Run tests for bindings using command cargo test --no-default-features + +[dependencies] +log = "0.4" +pravega-client-rust = { path = "../" } +pravega-wire-protocol = { path = "../wire_protocol"} +pravega-controller-client = { path = "../controller-client"} +pravega-rust-client-shared = { path = "../shared"} +pravega-rust-client-retry = {path = "../retry"} +pravega-connection-pool = {path= "../connection_pool"} +tokio = { version = "0.2.13", features = ["full"] } +lazy_static = "1.4.0" +uuid = {version = "0.8", features = ["v4"]} +futures = "0.3.5" +derive-new = "0.5" +#Python bindings +pyo3 = { features = ["extension-module"], optional = true, version = "0.10.1" } +#WASM bindings +wasm-bindgen = { version = "0.2.63", optional = true } +cfg-if = "0.1.10" diff --git a/bindings/Readme.md b/bindings/Readme.md new file mode 100644 index 000000000..b9ba89bb9 --- /dev/null +++ b/bindings/Readme.md @@ -0,0 +1,3 @@ +# Language Bindings + +This provides a way to generate multiple language bindings to interact with Pravega. \ No newline at end of file diff --git a/bindings/pyproject.toml b/bindings/pyproject.toml new file mode 100644 index 000000000..90cb176e8 --- /dev/null +++ b/bindings/pyproject.toml @@ -0,0 +1,3 @@ +[build-system] +requires = ["maturin"] +build-backend = "maturin" diff --git a/bindings/src/lib.rs b/bindings/src/lib.rs new file mode 100644 index 000000000..6669d6a48 --- /dev/null +++ b/bindings/src/lib.rs @@ -0,0 +1,33 @@ +// +// Copyright (c) Dell Inc., or its subsidiaries. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// + +#[macro_use] +extern crate cfg_if; + +mod stream_manager; +mod stream_writer; +cfg_if! { + if #[cfg(feature = "python_binding")] { + use pyo3::prelude::*; + use stream_manager::StreamManager; + #[macro_use] + extern crate derive_new; + use stream_writer::StreamWriter; + } +} + +#[cfg(feature = "python_binding")] +#[pymodule] +/// A Python module implemented in Rust. +fn pravega_client(_py: Python, m: &PyModule) -> PyResult<()> { + m.add_class::()?; + m.add_class::()?; + Ok(()) +} diff --git a/bindings/src/pravega_client_test.py b/bindings/src/pravega_client_test.py new file mode 100644 index 000000000..d5b27a723 --- /dev/null +++ b/bindings/src/pravega_client_test.py @@ -0,0 +1,36 @@ +# +# Copyright (c) Dell Inc., or its subsidiaries. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# + +import unittest +import pravega_client; + +class PravegaTest(unittest.TestCase): + def test_writeEvent(self): + print("Creating a Stream Manager, ensure pravega is running") + stream_manager=pravega_client.StreamManager("127.0.0.1:9090") + + print("Creating a scope") + scope_result=stream_manager.create_scope("testScope") + self.assertEqual(True, scope_result, "Scope creation status") + + print("Creating a stream") + stream_result=stream_manager.create_stream("testScope", "testStream", 1) + self.assertEqual(True, stream_result, "Stream creation status") + + print("Creating a writer for Stream") + w1=stream_manager.create_writer("testScope","testStream") + + print("Write events") + w1.write_event("test event1") + w1.write_event("test event2") + + +if __name__ == '__main__': + unittest.main() diff --git a/bindings/src/stream_manager.rs b/bindings/src/stream_manager.rs new file mode 100644 index 000000000..c21ac4c6c --- /dev/null +++ b/bindings/src/stream_manager.rs @@ -0,0 +1,214 @@ +// +// Copyright (c) Dell Inc., or its subsidiaries. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// + +cfg_if! { + if #[cfg(feature = "python_binding")] { + use crate::stream_writer::StreamWriter; + use pravega_client_rust::client_factory::ClientFactory; + use pravega_rust_client_shared::*; + use pravega_wire_protocol::client_config::{ClientConfig, ClientConfigBuilder}; + use pyo3::prelude::*; + use pyo3::PyResult; + use pyo3::{exceptions, PyObjectProtocol}; + use std::net::SocketAddr; + use tokio::runtime::Runtime; + } +} + +#[cfg(feature = "python_binding")] +#[pyclass] +pub(crate) struct StreamManager { + controller_ip: String, + rt: Runtime, + cf: ClientFactory, + config: ClientConfig, +} + +#[cfg(feature = "python_binding")] +#[pymethods] +impl StreamManager { + #[new] + fn new(controller_uri: String) -> Self { + let runtime = tokio::runtime::Runtime::new().expect("create runtime"); + let handle = runtime.handle().clone(); + let config = ClientConfigBuilder::default() + .controller_uri( + controller_uri + .parse::() + .expect("Parsing controller ip"), + ) + .build() + .expect("creating config"); + let client_factory = handle.block_on(ClientFactory::new(config.clone())); + + StreamManager { + controller_ip: controller_uri, + rt: runtime, + cf: client_factory, + config, + } + } + + /// + /// Create a Scope in Pravega. + /// + pub fn create_scope(&self, scope_name: &str) -> PyResult { + let handle = self.rt.handle().clone(); + println!("creating scope {:?}", scope_name); + + let controller = self.cf.get_controller_client(); + let scope_name = Scope::new(scope_name.to_string()); + + let scope_result = handle.block_on(controller.create_scope(&scope_name)); + println!("Scope creation status {:?}", scope_result); + match scope_result { + Ok(t) => Ok(t), + Err(e) => Err(exceptions::ValueError::py_err(format!("{:?}", e))), + } + } + + /// + /// Delete a Scope in Pravega. + /// + pub fn delete_scope(&self, scope_name: &str) -> PyResult { + let handle = self.rt.handle().clone(); + println!("Delete scope {:?}", scope_name); + + let controller = self.cf.get_controller_client(); + let scope_name = Scope::new(scope_name.to_string()); + + let scope_result = handle.block_on(controller.delete_scope(&scope_name)); + println!("Scope deletion status {:?}", scope_result); + match scope_result { + Ok(t) => Ok(t), + Err(e) => Err(exceptions::ValueError::py_err(format!("{:?}", e))), + } + } + + /// + /// Create a Stream in Pravega. + /// + pub fn create_stream( + &self, + scope_name: &str, + stream_name: &str, + initial_segments: i32, + ) -> PyResult { + let handle = self.rt.handle().clone(); + println!( + "creating stream {:?} under scope {:?} with segment count {:?}", + stream_name, scope_name, initial_segments + ); + let stream_cfg = StreamConfiguration { + scoped_stream: ScopedStream { + scope: Scope::new(scope_name.to_string()), + stream: Stream::new(stream_name.to_string()), + }, + scaling: Scaling { + scale_type: ScaleType::FixedNumSegments, + target_rate: 0, + scale_factor: 0, + min_num_segments: initial_segments, + }, + retention: Retention { + retention_type: RetentionType::None, + retention_param: 0, + }, + }; + let controller = self.cf.get_controller_client(); + + let stream_result = handle.block_on(controller.create_stream(&stream_cfg)); + println!("Stream creation status {:?}", stream_result); + match stream_result { + Ok(t) => Ok(t), + Err(e) => Err(exceptions::ValueError::py_err(format!("{:?}", e))), + } + } + + /// + /// Create a Stream in Pravega. + /// + pub fn seal_stream(&self, scope_name: &str, stream_name: &str) -> PyResult { + let handle = self.rt.handle().clone(); + println!("Sealing stream {:?} under scope {:?} ", stream_name, scope_name); + let scoped_stream = ScopedStream { + scope: Scope::new(scope_name.to_string()), + stream: Stream::new(stream_name.to_string()), + }; + + let controller = self.cf.get_controller_client(); + + let stream_result = handle.block_on(controller.seal_stream(&scoped_stream)); + println!("Sealing stream status {:?}", stream_result); + match stream_result { + Ok(t) => Ok(t), + Err(e) => Err(exceptions::ValueError::py_err(format!("{:?}", e))), + } + } + + /// + /// Delete a Stream in Pravega. + /// + pub fn delete_stream(&self, scope_name: &str, stream_name: &str) -> PyResult { + let handle = self.rt.handle().clone(); + println!("Deleting stream {:?} under scope {:?} ", stream_name, scope_name); + let scoped_stream = ScopedStream { + scope: Scope::new(scope_name.to_string()), + stream: Stream::new(stream_name.to_string()), + }; + + let controller = self.cf.get_controller_client(); + let stream_result = handle.block_on(controller.delete_stream(&scoped_stream)); + println!("Deleting stream status {:?}", stream_result); + match stream_result { + Ok(t) => Ok(t), + Err(e) => Err(exceptions::ValueError::py_err(format!("{:?}", e))), + } + } + + /// + /// Create a Writer for a given Stream. + /// + pub fn create_writer(&self, scope_name: &str, stream_name: &str) -> PyResult { + let scoped_stream = ScopedStream { + scope: Scope::new(scope_name.to_string()), + stream: Stream::new(stream_name.to_string()), + }; + let stream_writer = self.rt.handle().clone().block_on(async { + StreamWriter::new( + self.cf + .create_event_stream_writer(scoped_stream, self.config.clone()), + self.rt.handle().clone(), + ) + }); + Ok(stream_writer) + } + + /// Returns the facet string representation. + fn to_str(&self) -> String { + format!( + "Controller ip: {:?} ClientConfig: {:?}", + self.controller_ip, self.config + ) + } +} + +/// +/// Refer https://docs.python.org/3/reference/datamodel.html#basic-customization +/// This function will be called by the repr() built-in function to compute the “official” string +/// representation of an Python object. +/// +#[cfg(feature = "python_binding")] +#[pyproto] +impl PyObjectProtocol for StreamManager { + fn __repr__(&self) -> PyResult { + Ok(format!("StreamManager({})", self.to_str())) + } +} diff --git a/bindings/src/stream_writer.rs b/bindings/src/stream_writer.rs new file mode 100644 index 000000000..9c585969f --- /dev/null +++ b/bindings/src/stream_writer.rs @@ -0,0 +1,86 @@ +// +// Copyright (c) Dell Inc., or its subsidiaries. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// + +cfg_if! { + if #[cfg(feature = "python_binding")] { + use pravega_client_rust::error::EventStreamWriterError; + use pravega_client_rust::event_stream_writer::EventStreamWriter; + use pyo3::exceptions; + use pyo3::prelude::*; + use pyo3::PyResult; + use tokio::runtime::Handle; + } +} + +#[cfg(feature = "python_binding")] +#[pyclass] +#[derive(new)] // this ensures the python object cannot be created without the using StreamManager. +pub(crate) struct StreamWriter { + writer: EventStreamWriter, + handle: Handle, +} + +#[cfg(feature = "python_binding")] +#[pymethods] +impl StreamWriter { + /// + /// Write an event as a String into to the Pravega Stream. The operation blocks until the write operations is completed. + /// + pub fn write_event(&mut self, event: String) -> PyResult<()> { + self.write_event_bytes(event.into_bytes()) // + } + + /// + /// Write an event into the Pravega Stream for the given routing key. + /// + pub fn write_event_by_routing_key(&mut self, event: String, routing_key: String) -> PyResult<()> { + self.write_event_by_routing_key_bytes(event.into_bytes(), routing_key) + } + + /// + /// Write an event to Pravega Stream. The operation blocks until the write operations is completed. + /// Python can also be used to convert a given object into bytes. + /// + /// E.g: + /// >>> e="test" + /// >>> b=e.encode("utf-8") // Python api to convert an object to byte array. + /// >>> w1.write_event_bytes(b) + /// + pub fn write_event_bytes(&mut self, event: Vec) -> PyResult<()> { + println!("Writing a single event"); + let result = self.handle.block_on(self.writer.write_event(event)); + let result_oneshot: Result<(), EventStreamWriterError> = + self.handle.block_on(result).expect("Write failed"); + + match result_oneshot { + Ok(_t) => Ok(()), + Err(e) => Err(exceptions::ValueError::py_err(format!("{:?}", e))), + } + } + + /// + /// Write an event to the Pravega Stream given a routing key. + /// + pub fn write_event_by_routing_key_bytes(&mut self, event: Vec, routing_key: String) -> PyResult<()> { + println!("Writing a single event for a given routing key"); + let result = self + .handle + .block_on(self.writer.write_event_by_routing_key(routing_key, event)); + let result_oneshot: Result<(), EventStreamWriterError> = self + .handle + .block_on(result) + .expect("Write for specified routing key failed"); + + match result_oneshot { + Ok(_t) => Ok(()), + Err(e) => Err(exceptions::ValueError::py_err(format!("{:?}", e))), + } + } +} diff --git a/bindings/tox.ini b/bindings/tox.ini new file mode 100644 index 000000000..db9408d19 --- /dev/null +++ b/bindings/tox.ini @@ -0,0 +1,12 @@ +[tox] +envlist = py36 +requires = tox-pyo3 + +[testenv] +pyo3 = True +deps = + pytest +commands = + pytest -vvvv + +skip_install = True \ No newline at end of file diff --git a/connection_pool/src/lib.rs b/connection_pool/src/lib.rs index 9cf6ec2b1..b78e6b244 100644 --- a/connection_pool/src/lib.rs +++ b/connection_pool/src/lib.rs @@ -21,8 +21,7 @@ clippy::cargo_common_metadata, clippy::mutex_integer, clippy::needless_borrow, - clippy::option_unwrap_used, - clippy::result_unwrap_used, + clippy::unwrap_used, clippy::similar_names )] #![allow(clippy::multiple_crate_versions)] diff --git a/controller-client/src/cli.rs b/controller-client/src/cli.rs index 8d07b08c8..39e08deb2 100644 --- a/controller-client/src/cli.rs +++ b/controller-client/src/cli.rs @@ -78,7 +78,7 @@ fn main() { .build() .expect("creating config"); // create a controller client. - let controller_client = ControllerClientImpl::new(config); + let controller_client = rt.block_on(ControllerClientImpl::new(config)); match opt.cmd { Command::CreateScope { scope_name } => { let scope_result = rt.block_on(controller_client.create_scope(&Scope::new(scope_name))); diff --git a/controller-client/src/lib.rs b/controller-client/src/lib.rs index 4d1656d20..37938008e 100644 --- a/controller-client/src/lib.rs +++ b/controller-client/src/lib.rs @@ -25,6 +25,7 @@ )] #![allow(clippy::multiple_crate_versions)] #![allow(dead_code)] +#![allow(clippy::similar_names)] use std::result::Result as StdResult; use std::time::{Duration, Instant}; @@ -229,7 +230,7 @@ pub struct ControllerClientImpl { channel: RwLock>, } -fn get_channel(config: &ClientConfig) -> Channel { +async fn get_channel(config: &ClientConfig) -> Channel { const HTTP_PREFIX: &str = "http://"; // Placeholder to add authentication headers. @@ -243,8 +244,7 @@ fn get_channel(config: &ClientConfig) -> Channel { let iterable_endpoints = (0..config.max_controller_connections).map(|_a| Channel::builder(uri_result.clone())); - - Channel::balance_list(iterable_endpoints) + async { Channel::balance_list(iterable_endpoints) }.await } #[allow(unused_variables)] @@ -275,7 +275,7 @@ impl ControllerClient for ControllerClientImpl { error_msg: "Operation failed".into(), }), }, - Err(status) => Err(self.map_grpc_error(operation_name, status)), + Err(status) => Err(self.map_grpc_error(operation_name, status).await), } } @@ -306,7 +306,7 @@ impl ControllerClient for ControllerClientImpl { error_msg: "Operation failed".into(), }), }, - Err(status) => Err(self.map_grpc_error(operation_name, status)), + Err(status) => Err(self.map_grpc_error(operation_name, status).await), } } @@ -336,7 +336,7 @@ impl ControllerClient for ControllerClientImpl { error_msg: "Operation failed".into(), }), }, - Err(status) => Err(self.map_grpc_error(operation_name, status)), + Err(status) => Err(self.map_grpc_error(operation_name, status).await), } } @@ -365,7 +365,7 @@ impl ControllerClient for ControllerClientImpl { error_msg: "Operation failed".into(), }), }, - Err(status) => Err(self.map_grpc_error(operation_name, status)), + Err(status) => Err(self.map_grpc_error(operation_name, status).await), } } @@ -394,7 +394,7 @@ impl ControllerClient for ControllerClientImpl { error_msg: "Operation failed".into(), }), }, - Err(status) => Err(self.map_grpc_error(operation_name, status)), + Err(status) => Err(self.map_grpc_error(operation_name, status).await), } } @@ -423,7 +423,7 @@ impl ControllerClient for ControllerClientImpl { error_msg: "Operation failed".into(), }), }, - Err(status) => Err(self.map_grpc_error(operation_name, status)), + Err(status) => Err(self.map_grpc_error(operation_name, status).await), } } @@ -453,7 +453,7 @@ impl ControllerClient for ControllerClientImpl { error_msg: "Operation failed".into(), }), }, - Err(status) => Err(self.map_grpc_error(operation_name, status)), + Err(status) => Err(self.map_grpc_error(operation_name, status).await), } } @@ -466,7 +466,7 @@ impl ControllerClient for ControllerClientImpl { let operation_name = "getCurrentSegments"; match op_status { Ok(segment_ranges) => Ok(StreamSegments::from(segment_ranges.into_inner())), - Err(status) => Err(self.map_grpc_error(operation_name, status)), + Err(status) => Err(self.map_grpc_error(operation_name, status).await), } } @@ -483,7 +483,7 @@ impl ControllerClient for ControllerClientImpl { let operation_name = "createTransaction"; match op_status { Ok(create_txn_response) => Ok(TxnSegments::from(create_txn_response.into_inner())), - Err(status) => Err(self.map_grpc_error(operation_name, status)), + Err(status) => Err(self.map_grpc_error(operation_name, status).await), } } @@ -535,7 +535,7 @@ impl ControllerClient for ControllerClientImpl { error_msg: "Operation failed".into(), }), }, - Err(status) => Err(self.map_grpc_error(operation_name, status)), + Err(status) => Err(self.map_grpc_error(operation_name, status).await), } } @@ -577,7 +577,7 @@ impl ControllerClient for ControllerClientImpl { error_msg: "Operation failed".into(), }), }, - Err(status) => Err(self.map_grpc_error(operation_name, status)), + Err(status) => Err(self.map_grpc_error(operation_name, status).await), } } @@ -613,7 +613,7 @@ impl ControllerClient for ControllerClientImpl { error_msg: "Operation failed".into(), }), }, - Err(status) => Err(self.map_grpc_error(operation_name, status)), + Err(status) => Err(self.map_grpc_error(operation_name, status).await), } } @@ -647,7 +647,7 @@ impl ControllerClient for ControllerClientImpl { error_msg: "Operation failed".into(), }), }, - Err(status) => Err(self.map_grpc_error(operation_name, status)), + Err(status) => Err(self.map_grpc_error(operation_name, status).await), } } @@ -659,7 +659,7 @@ impl ControllerClient for ControllerClientImpl { let operation_name = "get_endpoint"; match op_status { Ok(response) => Ok(response.into_inner()), - Err(status) => Err(self.map_grpc_error(operation_name, status)), + Err(status) => Err(self.map_grpc_error(operation_name, status).await), } .map(PravegaNodeUri::from) } @@ -685,7 +685,7 @@ impl ControllerClient for ControllerClientImpl { let operation_name = "get_successors_segment"; match op_status { Ok(response) => Ok(response.into_inner()), - Err(status) => Err(self.map_grpc_error(operation_name, status)), + Err(status) => Err(self.map_grpc_error(operation_name, status).await), } .map(StreamSegmentsWithPredecessors::from) } @@ -734,7 +734,7 @@ impl ControllerClient for ControllerClientImpl { }), } } - Err(status) => Err(self.map_grpc_error(operation_name, status)), + Err(status) => Err(self.map_grpc_error(operation_name, status).await), } } @@ -767,7 +767,7 @@ impl ControllerClient for ControllerClientImpl { error_msg: "Operation failed".into(), }), }, - Err(status) => Err(self.map_grpc_error(operation_name, status)), + Err(status) => Err(self.map_grpc_error(operation_name, status).await), } } } @@ -778,9 +778,9 @@ impl ControllerClientImpl { /// The requests will be load balanced across multiple connections and every connection supports /// multiplexing of requests. /// - pub fn new(config: ClientConfig) -> Self { + pub async fn new(config: ClientConfig) -> Self { // actual connection is established lazily. - let ch = get_channel(&config); + let ch = get_channel(&config).await; ControllerClientImpl { config, channel: RwLock::new(ControllerServiceClient::new(ch)), @@ -791,8 +791,8 @@ impl ControllerClientImpl { /// reset method needs to be invoked in the case of ConnectionError. /// This logic can be removed once https://github.com/tower-rs/tower/issues/383 is fixed. /// - pub fn reset(&self) { - let ch = get_channel(&self.config); + pub async fn reset(&self) { + let ch = get_channel(&self.config).await; let mut x = self.channel.write().unwrap(); *x = ControllerServiceClient::new(ch); } @@ -808,7 +808,7 @@ impl ControllerClientImpl { } // Method used to translate grpc errors to ControllerError. - fn map_grpc_error(&self, operation_name: &str, status: Status) -> ControllerError { + async fn map_grpc_error(&self, operation_name: &str, status: Status) -> ControllerError { match status.code() { Code::InvalidArgument | Code::NotFound @@ -822,7 +822,7 @@ impl ControllerClientImpl { error_msg: status.to_string(), }, Code::Unknown => { - self.reset(); + self.reset().await; ControllerError::ConnectionError { can_retry: true, error_msg: status.to_string(), diff --git a/controller-client/src/main.rs b/controller-client/src/main.rs index fb5237848..59257e0b7 100644 --- a/controller-client/src/main.rs +++ b/controller-client/src/main.rs @@ -24,7 +24,7 @@ async fn main() -> std::result::Result<(), Box> .build() .expect("creating config"); // start Pravega standalone before invoking this function. - let controller_client = ControllerClientImpl::new(config); + let controller_client = ControllerClientImpl::new(config).await; let scope_name = Scope::new("testScope123".into()); let stream_name = Stream::new("testStream".into()); diff --git a/controller-client/src/model_helper.rs b/controller-client/src/model_helper.rs index 709d2ba18..1ce2d762e 100644 --- a/controller-client/src/model_helper.rs +++ b/controller-client/src/model_helper.rs @@ -121,7 +121,7 @@ impl Into for pravega_rust_client_shared::StreamCu fn into(self) -> crate::controller::StreamCut { crate::controller::StreamCut { stream_info: Some(self.scoped_stream.into()), - cut: self.segment_offset_map.to_owned(), // create a clone + cut: self.segment_offset_map, } } } diff --git a/controller-client/src/test.rs b/controller-client/src/test.rs index 9fe509c91..4d4e7484c 100644 --- a/controller-client/src/test.rs +++ b/controller-client/src/test.rs @@ -19,7 +19,7 @@ async fn test_create_scope_error() { .build() .expect("build client config"); - let client = ControllerClientImpl::new(config); + let client = ControllerClientImpl::new(config).await; let request = Scope::new("testScope124".into()); let create_scope_result = client.create_scope(&request).await; @@ -40,7 +40,7 @@ async fn test_create_stream_error() { .controller_uri("127.0.0.1:9090".parse::().unwrap()) .build() .expect("build client config"); - let client = ControllerClientImpl::new(config); + let client = ControllerClientImpl::new(config).await; let request = StreamConfiguration { scoped_stream: ScopedStream { diff --git a/integration_test/src/controller_tests.rs b/integration_test/src/controller_tests.rs index 9e545b438..f156234e6 100644 --- a/integration_test/src/controller_tests.rs +++ b/integration_test/src/controller_tests.rs @@ -19,7 +19,7 @@ pub async fn test_controller_apis() { .controller_uri(TEST_CONTROLLER_URI) .build() .expect("creating config"); - let client_factory = ClientFactory::new(config.clone()); + let client_factory = ClientFactory::new(config.clone()).await; let controller = client_factory.get_controller_client(); let scope_name = Scope::new("testScope123".into()); diff --git a/integration_test/src/disconnection_tests.rs b/integration_test/src/disconnection_tests.rs index 42544266b..46cdf2762 100644 --- a/integration_test/src/disconnection_tests.rs +++ b/integration_test/src/disconnection_tests.rs @@ -86,7 +86,7 @@ async fn test_retry_while_start_pravega() { .controller_uri(controller_uri) .build() .expect("build client config"); - let controller_client = ControllerClientImpl::new(config); + let controller_client = ControllerClientImpl::new(config).await; let scope_name = Scope::new("retryScope".into()); @@ -143,7 +143,7 @@ async fn test_retry_with_unexpected_reply() { .build() .expect("build client config"); - let controller_client = ControllerClientImpl::new(config); + let controller_client = ControllerClientImpl::new(config).await; //Get the endpoint. let segment_name = ScopedSegment { diff --git a/integration_test/src/event_stream_writer_tests.rs b/integration_test/src/event_stream_writer_tests.rs index f17da29d5..38bc5043d 100644 --- a/integration_test/src/event_stream_writer_tests.rs +++ b/integration_test/src/event_stream_writer_tests.rs @@ -36,7 +36,7 @@ pub async fn test_event_stream_writer() { .controller_uri(TEST_CONTROLLER_URI) .build() .expect("creating config"); - let client_factory = ClientFactory::new(config.clone()); + let client_factory = ClientFactory::new(config.clone()).await; let controller_client = client_factory.get_controller_client(); create_scope_stream(controller_client, &scope_name, &stream_name, 1).await; diff --git a/integration_test/src/tablemap_tests.rs b/integration_test/src/tablemap_tests.rs index ae63a1da4..502c4dcec 100644 --- a/integration_test/src/tablemap_tests.rs +++ b/integration_test/src/tablemap_tests.rs @@ -27,7 +27,7 @@ pub async fn test_tablemap() { .build() .expect("creating config"); - let client_factory = ClientFactory::new(config.clone()); + let client_factory = ClientFactory::new(config.clone()).await; test_single_key_operations(&client_factory).await; test_multiple_key_operations(&client_factory).await; test_multiple_key_remove_operations(&client_factory).await; diff --git a/integration_test/src/wirecommand_tests.rs b/integration_test/src/wirecommand_tests.rs index 56950ce01..0ac01c310 100644 --- a/integration_test/src/wirecommand_tests.rs +++ b/integration_test/src/wirecommand_tests.rs @@ -39,61 +39,85 @@ lazy_static! { let manager = SegmentConnectionManager::new(cf, CONFIG.max_connections_in_pool); ConnectionPool::new(manager) }; - static ref CONTROLLER_CLIENT: ControllerClientImpl = { ControllerClientImpl::new(CONFIG.clone()) }; } pub async fn wirecommand_test_wrapper() { - let timeout_second = time::Duration::from_secs(30); + let controller: ControllerClientImpl = ControllerClientImpl::new(CONFIG.clone()).await; - timeout(timeout_second, test_hello()).await.unwrap(); + let timeout_second = time::Duration::from_secs(30); - timeout(timeout_second, test_keep_alive()).await.unwrap(); + timeout(timeout_second, test_hello(&controller)).await.unwrap(); - timeout(timeout_second, test_setup_append()).await.unwrap(); + timeout(timeout_second, test_keep_alive(&controller)) + .await + .unwrap(); - timeout(timeout_second, test_create_segment()).await.unwrap(); + timeout(timeout_second, test_setup_append(&controller)) + .await + .unwrap(); - timeout(timeout_second, test_update_and_get_segment_attribute()) + timeout(timeout_second, test_create_segment(&controller)) .await .unwrap(); - timeout(timeout_second, test_get_stream_segment_info()) + timeout(timeout_second, test_update_and_get_segment_attribute(&controller)) .await .unwrap(); - timeout(timeout_second, test_seal_segment()).await.unwrap(); + timeout(timeout_second, test_get_stream_segment_info(&controller)) + .await + .unwrap(); - timeout(timeout_second, test_delete_segment()).await.unwrap(); + timeout(timeout_second, test_seal_segment(&controller)) + .await + .unwrap(); - timeout(timeout_second, test_conditional_append_and_read_segment()) + timeout(timeout_second, test_delete_segment(&controller)) .await .unwrap(); - timeout(timeout_second, test_update_segment_policy()) + timeout( + timeout_second, + test_conditional_append_and_read_segment(&controller), + ) + .await + .unwrap(); + + timeout(timeout_second, test_update_segment_policy(&controller)) .await .unwrap(); - timeout(timeout_second, test_merge_segment()).await.unwrap(); + timeout(timeout_second, test_merge_segment(&controller)) + .await + .unwrap(); - timeout(timeout_second, test_truncate_segment()).await.unwrap(); + timeout(timeout_second, test_truncate_segment(&controller)) + .await + .unwrap(); - timeout(timeout_second, test_update_table_entries()) + timeout(timeout_second, test_update_table_entries(&controller)) .await .unwrap(); - timeout(timeout_second, test_read_table_key()).await.unwrap(); + timeout(timeout_second, test_read_table_key(&controller)) + .await + .unwrap(); - timeout(timeout_second, test_read_table()).await.unwrap(); + timeout(timeout_second, test_read_table(&controller)) + .await + .unwrap(); - timeout(timeout_second, test_read_table_entries()).await.unwrap(); + timeout(timeout_second, test_read_table_entries(&controller)) + .await + .unwrap(); } -async fn test_hello() { +async fn test_hello(controller_client: &ControllerClientImpl) { let scope_name = Scope::new("testScope".into()); let stream_name = Stream::new("testStream".into()); // Create scope and stream - CONTROLLER_CLIENT + controller_client .create_scope(&scope_name) .await .expect("create scope"); @@ -114,7 +138,7 @@ async fn test_hello() { retention_param: 0, }, }; - CONTROLLER_CLIENT + controller_client .create_stream(&request) .await .expect("create stream"); @@ -124,7 +148,7 @@ async fn test_hello() { stream: stream_name.clone(), segment: Segment { number: 0 }, }; - let endpoint = CONTROLLER_CLIENT + let endpoint = controller_client .get_endpoint_for_segment(&segment_name) .await .expect("get endpoint for segment") @@ -150,7 +174,7 @@ async fn test_hello() { } // KeepAlive would not send back reply. -async fn test_keep_alive() { +async fn test_keep_alive(controller_client: &ControllerClientImpl) { let scope_name = Scope::new("testScope".into()); let stream_name = Stream::new("testStream".into()); let segment_name = ScopedSegment { @@ -159,7 +183,7 @@ async fn test_keep_alive() { segment: Segment { number: 0 }, }; - let endpoint = CONTROLLER_CLIENT + let endpoint = controller_client .get_endpoint_for_segment(&segment_name) .await .expect("get endpoint for segment") @@ -175,7 +199,7 @@ async fn test_keep_alive() { client_connection.write(&request).await.expect("send request"); } -async fn test_setup_append() { +async fn test_setup_append(controller_client: &ControllerClientImpl) { let scope_name = Scope::new("testScope".into()); let stream_name = Stream::new("testStream".into()); let segment_name = ScopedSegment { @@ -184,7 +208,7 @@ async fn test_setup_append() { segment: Segment { number: 0 }, }; - let endpoint = CONTROLLER_CLIENT + let endpoint = controller_client .get_endpoint_for_segment(&segment_name) .await .expect("get endpoint for segment") @@ -238,7 +262,7 @@ async fn test_setup_append() { .map_or_else(|e| panic!("failed to get reply: {}", e), |r| assert_eq!(reply, r)); } -async fn test_create_segment() { +async fn test_create_segment(controller_client: &ControllerClientImpl) { let scope_name = Scope::new("testScope".into()); let stream_name = Stream::new("testStream".into()); let segment_name = ScopedSegment { @@ -247,7 +271,7 @@ async fn test_create_segment() { segment: Segment { number: 1 }, }; - let endpoint = CONTROLLER_CLIENT + let endpoint = controller_client .get_endpoint_for_segment(&segment_name) .await .expect("get endpoint for segment") @@ -290,7 +314,7 @@ async fn test_create_segment() { ); } -async fn test_seal_segment() { +async fn test_seal_segment(controller_client: &ControllerClientImpl) { let scope_name = Scope::new("testScope".into()); let stream_name = Stream::new("testStream".into()); let segment_name = ScopedSegment { @@ -299,7 +323,7 @@ async fn test_seal_segment() { segment: Segment { number: 1 }, }; - let endpoint = CONTROLLER_CLIENT + let endpoint = controller_client .get_endpoint_for_segment(&segment_name) .await .expect("get endpoint for segment") @@ -325,7 +349,7 @@ async fn test_seal_segment() { .map_or_else(|e| panic!("failed to get reply: {}", e), |r| assert_eq!(reply, r)); } -async fn test_update_and_get_segment_attribute() { +async fn test_update_and_get_segment_attribute(controller_client: &ControllerClientImpl) { let scope_name = Scope::new("testScope".into()); let stream_name = Stream::new("testStream".into()); let segment_name = ScopedSegment { @@ -334,7 +358,7 @@ async fn test_update_and_get_segment_attribute() { segment: Segment { number: 0 }, }; - let endpoint = CONTROLLER_CLIENT + let endpoint = controller_client .get_endpoint_for_segment(&segment_name) .await .expect("get endpoint for segment") @@ -379,7 +403,7 @@ async fn test_update_and_get_segment_attribute() { .map_or_else(|e| panic!("failed to get reply: {}", e), |r| assert_eq!(reply, r)); } -async fn test_get_stream_segment_info() { +async fn test_get_stream_segment_info(controller_client: &ControllerClientImpl) { let scope_name = Scope::new("testScope".into()); let stream_name = Stream::new("testStream".into()); let stream = ScopedStream { @@ -388,7 +412,7 @@ async fn test_get_stream_segment_info() { }; //seal this stream. - CONTROLLER_CLIENT.seal_stream(&stream).await.expect("seal stream"); + controller_client.seal_stream(&stream).await.expect("seal stream"); let segment_name = ScopedSegment { scope: scope_name.clone(), @@ -396,7 +420,7 @@ async fn test_get_stream_segment_info() { segment: Segment { number: 0 }, }; - let endpoint = CONTROLLER_CLIENT + let endpoint = controller_client .get_endpoint_for_segment(&segment_name) .await .expect("get endpoint for segment") @@ -422,7 +446,7 @@ async fn test_get_stream_segment_info() { } } -async fn test_delete_segment() { +async fn test_delete_segment(controller_client: &ControllerClientImpl) { let scope_name = Scope::new("testScope".into()); let stream_name = Stream::new("testStream".into()); let segment_name = ScopedSegment { @@ -431,7 +455,7 @@ async fn test_delete_segment() { segment: Segment { number: 0 }, }; - let endpoint = CONTROLLER_CLIENT + let endpoint = controller_client .get_endpoint_for_segment(&segment_name) .await .expect("get endpoint for segment") @@ -455,7 +479,7 @@ async fn test_delete_segment() { .map_or_else(|e| panic!("failed to get reply: {}", e), |r| assert_eq!(reply, r)); } -async fn test_conditional_append_and_read_segment() { +async fn test_conditional_append_and_read_segment(controller_client: &ControllerClientImpl) { // create a segment. let scope_name = Scope::new("scope".into()); let stream_name = Stream::new("stream".into()); @@ -466,7 +490,7 @@ async fn test_conditional_append_and_read_segment() { segment: Segment { number: 0 }, }; - let endpoint = CONTROLLER_CLIENT + let endpoint = controller_client .get_endpoint_for_segment(&segment_name) .await .expect("get endpoint for segment") @@ -541,7 +565,7 @@ async fn test_conditional_append_and_read_segment() { .map_or_else(|e| panic!("failed to get reply: {}", e), |r| assert_eq!(reply, r)); } -async fn test_update_segment_policy() { +async fn test_update_segment_policy(controller_client: &ControllerClientImpl) { let scope_name = Scope::new("scope".into()); let stream_name = Stream::new("stream".into()); @@ -551,7 +575,7 @@ async fn test_update_segment_policy() { segment: Segment { number: 0 }, }; - let endpoint = CONTROLLER_CLIENT + let endpoint = controller_client .get_endpoint_for_segment(&segment_name) .await .expect("get endpoint for segment") @@ -578,7 +602,7 @@ async fn test_update_segment_policy() { .map_or_else(|e| panic!("failed to get reply: {}", e), |r| assert_eq!(reply, r)); } -async fn test_merge_segment() { +async fn test_merge_segment(controller_client: &ControllerClientImpl) { let scope_name = Scope::new("scope".into()); let stream_name = Stream::new("stream".into()); @@ -588,7 +612,7 @@ async fn test_merge_segment() { segment: Segment { number: 1 }, }; - let endpoint = CONTROLLER_CLIENT + let endpoint = controller_client .get_endpoint_for_segment(&segment_name) .await .expect("get endpoint for segment") @@ -668,7 +692,7 @@ async fn test_merge_segment() { .map_or_else(|e| panic!("failed to get reply: {}", e), |r| assert_eq!(reply, r)); } -async fn test_truncate_segment() { +async fn test_truncate_segment(controller_client: &ControllerClientImpl) { let scope_name = Scope::new("scope".into()); let stream_name = Stream::new("stream".into()); @@ -678,7 +702,7 @@ async fn test_truncate_segment() { segment: Segment { number: 0 }, }; - let endpoint = CONTROLLER_CLIENT + let endpoint = controller_client .get_endpoint_for_segment(&segment_name) .await .expect("get endpoint for segment") @@ -705,7 +729,7 @@ async fn test_truncate_segment() { .map_or_else(|e| panic!("failed to get reply: {}", e), |r| assert_eq!(reply, r)); } -async fn test_update_table_entries() { +async fn test_update_table_entries(controller_client: &ControllerClientImpl) { let scope_name = Scope::new("scope".into()); let stream_name = Stream::new("stream".into()); // create a new segment. @@ -715,7 +739,7 @@ async fn test_update_table_entries() { segment: Segment { number: 2 }, }; - let endpoint = CONTROLLER_CLIENT + let endpoint = controller_client .get_endpoint_for_segment(&segment_name) .await .expect("get endpoint for segment") @@ -811,7 +835,7 @@ async fn test_update_table_entries() { .map_or_else(|e| panic!("failed to get reply: {}", e), |r| assert_eq!(reply, r)); } -async fn test_read_table_key() { +async fn test_read_table_key(controller_client: &ControllerClientImpl) { let scope_name = Scope::new("scope".into()); let stream_name = Stream::new("stream".into()); let segment_name = ScopedSegment { @@ -820,7 +844,7 @@ async fn test_read_table_key() { segment: Segment { number: 2 }, }; - let endpoint = CONTROLLER_CLIENT + let endpoint = controller_client .get_endpoint_for_segment(&segment_name) .await .expect("get endpoint for segment") @@ -851,7 +875,7 @@ async fn test_read_table_key() { } } -async fn test_read_table() { +async fn test_read_table(controller_client: &ControllerClientImpl) { let scope_name = Scope::new("scope".into()); let stream_name = Stream::new("stream".into()); let segment_name = ScopedSegment { @@ -860,7 +884,7 @@ async fn test_read_table() { segment: Segment { number: 2 }, }; - let endpoint = CONTROLLER_CLIENT + let endpoint = controller_client .get_endpoint_for_segment(&segment_name) .await .expect("get endpoint for segment") @@ -903,7 +927,7 @@ async fn test_read_table() { .map_or_else(|e| panic!("failed to get reply: {}", e), |r| assert_eq!(reply, r)); } -async fn test_read_table_entries() { +async fn test_read_table_entries(controller_client: &ControllerClientImpl) { let scope_name = Scope::new("scope".into()); let stream_name = Stream::new("stream".into()); let segment_name = ScopedSegment { @@ -912,7 +936,7 @@ async fn test_read_table_entries() { segment: Segment { number: 2 }, }; - let endpoint = CONTROLLER_CLIENT + let endpoint = controller_client .get_endpoint_for_segment(&segment_name) .await .expect("get endpoint for segment") diff --git a/retry/src/lib.rs b/retry/src/lib.rs index 410bc9364..cf0f2ed0e 100644 --- a/retry/src/lib.rs +++ b/retry/src/lib.rs @@ -35,8 +35,7 @@ clippy::cargo_common_metadata, clippy::mutex_integer, clippy::needless_borrow, - clippy::option_unwrap_used, - clippy::result_unwrap_used, + clippy::unwrap_used, clippy::similar_names )] #![allow(clippy::multiple_crate_versions)] diff --git a/rust-toolchain b/rust-toolchain index b07d35aa3..21b07a553 100644 --- a/rust-toolchain +++ b/rust-toolchain @@ -1,2 +1,2 @@ -stable - +nightly +## Change to stable once issue https://github.com/PyO3/pyo3/issues/5 is resolved, tracked by https://github.com/PyO3/pyo3/issues/210 diff --git a/shared/src/lib.rs b/shared/src/lib.rs index 246302195..6293bc081 100644 --- a/shared/src/lib.rs +++ b/shared/src/lib.rs @@ -20,8 +20,7 @@ clippy::cargo_common_metadata, clippy::mutex_integer, clippy::needless_borrow, - clippy::option_unwrap_used, - clippy::result_unwrap_used, + clippy::unwrap_used, clippy::similar_names )] #![allow(clippy::multiple_crate_versions)] diff --git a/src/client_factory.rs b/src/client_factory.rs index 3fdb8dd6e..dfb47b9fd 100644 --- a/src/client_factory.rs +++ b/src/client_factory.rs @@ -32,14 +32,14 @@ pub struct ClientFactoryInternal { } impl ClientFactory { - pub fn new(config: ClientConfig) -> ClientFactory { + pub async fn new(config: ClientConfig) -> ClientFactory { let _ = setup_logger(); //Ignore failure let cf = ConnectionFactory::create(config.connection_type); let pool = ConnectionPool::new(SegmentConnectionManager::new(cf, config.max_connections_in_pool)); let controller = if config.mock { Box::new(MockController::new(config.controller_uri)) as Box } else { - Box::new(ControllerClientImpl::new(config)) as Box + Box::new(ControllerClientImpl::new(config).await) as Box }; ClientFactory(Arc::new(ClientFactoryInternal { connection_pool: pool, diff --git a/src/lib.rs b/src/lib.rs index 38cafb282..92d715ccd 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -22,8 +22,7 @@ clippy::cargo_common_metadata, clippy::mutex_integer, clippy::needless_borrow, - clippy::option_unwrap_used, - clippy::result_unwrap_used, + clippy::unwrap_used, clippy::similar_names )] #![allow(clippy::multiple_crate_versions)] diff --git a/wire_protocol/src/lib.rs b/wire_protocol/src/lib.rs index 564bc6762..f7eb3552f 100644 --- a/wire_protocol/src/lib.rs +++ b/wire_protocol/src/lib.rs @@ -21,8 +21,7 @@ clippy::cargo_common_metadata, clippy::mutex_integer, clippy::needless_borrow, - clippy::option_unwrap_used, - clippy::result_unwrap_used, + clippy::unwrap_used, clippy::similar_names )] #![allow(clippy::multiple_crate_versions)]