Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Disconnection test #76

Merged
merged 29 commits into from
Apr 13, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
170 changes: 87 additions & 83 deletions controller-client/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,10 @@
use std::result::Result as StdResult;
use std::time::Duration;

use snafu::ResultExt;
use snafu::Snafu;
use tonic::transport::channel::Channel;
use tonic::transport::Error as tonicError;
use tonic::{Code, Status};

use async_trait::async_trait;
Expand All @@ -45,7 +47,7 @@ use log::debug;
use pravega_rust_client_shared::*;
use pravega_wire_protocol::client_config::ClientConfig;
use pravega_wire_protocol::connection_pool::{ConnectionPool, Manager, PooledConnection};
use pravega_wire_protocol::error::*;
use pravega_wire_protocol::error::ConnectionPoolError;
use std::convert::{From, Into};
use std::net::SocketAddr;
use uuid::Uuid;
Expand Down Expand Up @@ -78,6 +80,13 @@ pub enum ControllerError {
can_retry: bool,
endpoint: String,
error_msg: String,
source: tonicError,
},
#[snafu(display("Could not get connection from connection pool"))]
PoolError {
can_retry: bool,
endpoint: String,
source: ConnectionPoolError,
},
}

Expand Down Expand Up @@ -218,11 +227,10 @@ impl ControllerClientImpl {
#[async_trait]
impl ControllerClient for ControllerClientImpl {
async fn create_scope(&self, scope: &Scope) -> Result<bool> {
let connection = self
.pool
.get_connection(self.endpoint)
.await
.expect("get connection");
let connection = self.pool.get_connection(self.endpoint).await.context(PoolError {
can_retry: true,
endpoint: self.endpoint.to_string(),
})?;
create_scope(scope, connection).await
}

Expand All @@ -231,74 +239,66 @@ impl ControllerClient for ControllerClientImpl {
}

async fn delete_scope(&self, scope: &Scope) -> Result<bool> {
let connection = self
.pool
.get_connection(self.endpoint)
.await
.expect("get connection");
let connection = self.pool.get_connection(self.endpoint).await.context(PoolError {
can_retry: true,
endpoint: self.endpoint.to_string(),
})?;
delete_scope(scope, connection).await
}

async fn create_stream(&self, stream_config: &StreamConfiguration) -> Result<bool> {
let connection = self
.pool
.get_connection(self.endpoint)
.await
.expect("get connection");
let connection = self.pool.get_connection(self.endpoint).await.context(PoolError {
can_retry: true,
endpoint: self.endpoint.to_string(),
})?;
create_stream(stream_config, connection).await
}

async fn update_stream(&self, stream_config: &StreamConfiguration) -> Result<bool> {
let connection = self
.pool
.get_connection(self.endpoint)
.await
.expect("get connection");
let connection = self.pool.get_connection(self.endpoint).await.context(PoolError {
can_retry: true,
endpoint: self.endpoint.to_string(),
})?;
update_stream(stream_config, connection).await
}

async fn truncate_stream(&self, stream_cut: &StreamCut) -> Result<bool> {
let connection = self
.pool
.get_connection(self.endpoint)
.await
.expect("get connection");
let connection = self.pool.get_connection(self.endpoint).await.context(PoolError {
can_retry: true,
endpoint: self.endpoint.to_string(),
})?;
truncate_stream(stream_cut, connection).await
}

async fn seal_stream(&self, stream: &ScopedStream) -> Result<bool> {
let connection = self
.pool
.get_connection(self.endpoint)
.await
.expect("get connection");
let connection = self.pool.get_connection(self.endpoint).await.context(PoolError {
can_retry: true,
endpoint: self.endpoint.to_string(),
})?;
seal_stream(stream, connection).await
}

async fn delete_stream(&self, stream: &ScopedStream) -> Result<bool> {
let connection = self
.pool
.get_connection(self.endpoint)
.await
.expect("get connection");
let connection = self.pool.get_connection(self.endpoint).await.context(PoolError {
can_retry: true,
endpoint: self.endpoint.to_string(),
})?;
delete_stream(stream, connection).await
}

async fn get_current_segments(&self, stream: &ScopedStream) -> Result<StreamSegments> {
let connection = self
.pool
.get_connection(self.endpoint)
.await
.expect("get connection");
let connection = self.pool.get_connection(self.endpoint).await.context(PoolError {
can_retry: true,
endpoint: self.endpoint.to_string(),
})?;
get_current_segments(stream, connection).await
}

async fn create_transaction(&self, stream: &ScopedStream, lease: Duration) -> Result<TxnSegments> {
let connection = self
.pool
.get_connection(self.endpoint)
.await
.expect("get connection");
let connection = self.pool.get_connection(self.endpoint).await.context(PoolError {
can_retry: true,
endpoint: self.endpoint.to_string(),
})?;
create_transaction(stream, lease, connection).await
}

Expand All @@ -308,11 +308,10 @@ impl ControllerClient for ControllerClientImpl {
tx_id: TxId,
lease: Duration,
) -> Result<PingStatus> {
let connection = self
.pool
.get_connection(self.endpoint)
.await
.expect("get connection");
let connection = self.pool.get_connection(self.endpoint).await.context(PoolError {
can_retry: true,
endpoint: self.endpoint.to_string(),
})?;
ping_transaction(stream, tx_id, lease, connection).await
}

Expand All @@ -323,20 +322,18 @@ impl ControllerClient for ControllerClientImpl {
writer_id: WriterId,
time: Timestamp,
) -> Result<()> {
let connection = self
.pool
.get_connection(self.endpoint)
.await
.expect("get connection");
let connection = self.pool.get_connection(self.endpoint).await.context(PoolError {
can_retry: true,
endpoint: self.endpoint.to_string(),
})?;
commit_transaction(stream, tx_id, writer_id, time, connection).await
}

async fn abort_transaction(&self, stream: &ScopedStream, tx_id: TxId) -> Result<()> {
let connection = self
.pool
.get_connection(self.endpoint)
.await
.expect("get connection");
let connection = self.pool.get_connection(self.endpoint).await.context(PoolError {
can_retry: true,
endpoint: self.endpoint.to_string(),
})?;
abort_transaction(stream, tx_id, connection).await
}

Expand All @@ -345,20 +342,18 @@ impl ControllerClient for ControllerClientImpl {
stream: &ScopedStream,
tx_id: TxId,
) -> Result<TransactionStatus> {
let connection = self
.pool
.get_connection(self.endpoint)
.await
.expect("get connection");
let connection = self.pool.get_connection(self.endpoint).await.context(PoolError {
can_retry: true,
endpoint: self.endpoint.to_string(),
})?;
check_transaction_status(stream, tx_id, connection).await
}

async fn get_endpoint_for_segment(&self, segment: &ScopedSegment) -> Result<PravegaNodeUri> {
let connection = self
.pool
.get_connection(self.endpoint)
.await
.expect("get connection");
let connection = self.pool.get_connection(self.endpoint).await.context(PoolError {
can_retry: true,
endpoint: self.endpoint.to_string(),
})?;
get_uri_segment(segment, connection).await
}

Expand All @@ -367,11 +362,10 @@ impl ControllerClient for ControllerClientImpl {
}

async fn get_successors(&self, segment: &ScopedSegment) -> Result<StreamSegmentsWithPredecessors> {
let connection = self
.pool
.get_connection(self.endpoint)
.await
.expect("get connection");
let connection = self.pool.get_connection(self.endpoint).await.context(PoolError {
can_retry: true,
endpoint: self.endpoint.to_string(),
})?;
get_successors(segment, connection).await
}
}
Expand Down Expand Up @@ -411,11 +405,17 @@ impl Manager for ControllerConnectionManager {
&self,
endpoint: SocketAddr,
) -> std::result::Result<Self::Conn, ConnectionPoolError> {
let channel = create_connection(&format!("{}{}", "http://", &endpoint.to_string())).await;
Ok(ControllerConnection::new(endpoint, channel))
let result = create_connection(&format!("{}{}", "http://", &endpoint.to_string())).await;
match result {
Ok(channel) => Ok(ControllerConnection::new(endpoint, channel)),
Err(_e) => Err(ConnectionPoolError::EstablishConnection {
endpoint: endpoint.to_string(),
error_msg: String::from("Could not establish connection"),
}),
}
}

fn is_valid(&self, _conn: &PooledConnection<'_, Self::Conn>) -> bool {
fn is_valid(&self, _conn: &Self::Conn) -> bool {
true
}

Expand All @@ -425,12 +425,16 @@ impl Manager for ControllerConnectionManager {
}

/// create_connection with the given controller uri.
pub async fn create_connection(uri: &str) -> ControllerServiceClient<Channel> {
async fn create_connection(uri: &str) -> Result<ControllerServiceClient<Channel>> {
// Placeholder to add authentication headers.
let connection: ControllerServiceClient<Channel> = ControllerServiceClient::connect(uri.to_string())
let connection = ControllerServiceClient::connect(uri.to_string())
.await
.expect("Failed to create a channel");
connection
.context(ConnectionError {
can_retry: true,
endpoint: String::from(uri),
error_msg: String::from("Connection Refused"),
})?;
Ok(connection)
}

// Method used to translate grpc errors to custom error.
Expand Down
3 changes: 2 additions & 1 deletion integration_test/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,8 @@ pravega-client-rust = { path = "../" }
pravega-wire-protocol = { path = "../wire_protocol"}
pravega-controller-client = { path = "../controller-client"}
pravega-rust-client-shared = { path = "../shared"}
tokio = { version = "0.2.8", features = ["full"] }
pravega-rust-client-retry = {path = "../retry"}
tokio = { version = "0.2.13", features = ["full"] }
lazy_static = "1.4.0"
uuid = {version = "0.8", features = ["v4"]}

Expand Down
Loading