Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Issue 225: Implement list streams controller API #242

Merged
merged 16 commits into from
Apr 26, 2021
1 change: 0 additions & 1 deletion .github/workflows/python_test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ on:
pull_request:
branches:
- master
- enable-tox

name: pythontest

Expand Down
1 change: 1 addition & 0 deletions controller-client/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ im = "15"
tracing = "0.1"
jsonwebtoken = "7"
serde = {version = "1.0", features = ["derive"] }
futures = "0.3"

[build-dependencies]
tonic-build = "0.4"
Expand Down
22 changes: 22 additions & 0 deletions controller-client/src/cli.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,11 @@ enum Command {
#[structopt(help = "Stream Name")]
stream_name: String,
},
/// List Streams under a scope
ListStreams {
#[structopt(help = "Scope Name")]
scope_name: String,
},
}

#[derive(StructOpt, Debug)]
Expand Down Expand Up @@ -124,5 +129,22 @@ fn main() {
let result = rt.block_on(controller_client.delete_stream(&scoped_stream));
println!("Delete stream status {:?}", result);
}
Command::ListStreams { scope_name } => {
use futures::future;
use futures::stream::StreamExt;
use pravega_controller_client::paginator::list_streams;

let scope = Scope::from(scope_name.clone());
let stream = list_streams(scope, &controller_client);
println!("Listing streams under scope {:?}", scope_name);
rt.block_on(stream.for_each(|stream| {
if stream.is_ok() {
println!("{:?}", stream.unwrap());
} else {
println!("Error while fetching data from Controller. Details: {:?}", stream);
}
future::ready(())
}));
}
}
}
140 changes: 125 additions & 15 deletions controller-client/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,12 +35,12 @@ use async_trait::async_trait;
use controller::{
controller_service_client::ControllerServiceClient, create_scope_status, create_stream_status,
delete_scope_status, delete_stream_status, ping_txn_status, scale_request, scale_response,
scale_status_response, txn_state, txn_status, update_stream_status, CreateScopeStatus,
scale_status_response, txn_state, txn_status, update_stream_status, ContinuationToken, CreateScopeStatus,
CreateStreamStatus, CreateTxnRequest, CreateTxnResponse, DelegationToken, DeleteScopeStatus,
DeleteStreamStatus, GetEpochSegmentsRequest, GetSegmentsRequest, NodeUri, PingTxnRequest, PingTxnStatus,
ScaleRequest, ScaleResponse, ScaleStatusRequest, ScaleStatusResponse, ScopeInfo, SegmentId,
SegmentRanges, SegmentsAtTime, StreamConfig, StreamInfo, SuccessorResponse, TxnId, TxnRequest, TxnState,
TxnStatus, UpdateStreamStatus,
SegmentRanges, SegmentsAtTime, StreamConfig, StreamInfo, StreamsInScopeRequest, StreamsInScopeResponse,
SuccessorResponse, TxnId, TxnRequest, TxnState, TxnStatus, UpdateStreamStatus,
};
use im::{HashMap as ImHashMap, OrdMap};
use ordered_float::OrderedFloat;
Expand All @@ -59,7 +59,7 @@ use tokio::runtime::Runtime;
use tonic::codegen::http::uri::InvalidUri;
use tonic::transport::{Certificate, Channel, ClientTlsConfig, Endpoint, Uri};
use tonic::{metadata::MetadataValue, Code, Request, Status};
use tracing::{debug, info};
use tracing::{debug, info, warn};

#[allow(non_camel_case_types)]
pub mod controller {
Expand All @@ -69,6 +69,7 @@ pub mod controller {

pub mod mock_controller;
mod model_helper;
pub mod paginator;

// Max number of retries by the controller in case of a retryable failure.
const MAX_RETRIES: i32 = 10;
Expand All @@ -89,6 +90,8 @@ pub enum ControllerError {
ConnectionError { can_retry: bool, error_msg: String },
#[snafu(display("Invalid configuration passed to the Controller client. Error {}", error_msg))]
InvalidConfiguration { can_retry: bool, error_msg: String },
#[snafu(display("Invalid response from the Controller. Error {}", error_msg))]
InvalidResponse { can_retry: bool, error_msg: String },
}

// Implementation of Retryable trait for the error thrown by the Controller.
Expand All @@ -110,6 +113,10 @@ impl Retryable for ControllerError {
can_retry,
error_msg: _,
} => *can_retry,
InvalidResponse {
can_retry,
error_msg: _,
} => *can_retry,
}
}
}
Expand All @@ -128,7 +135,15 @@ pub trait ControllerClient: Send + Sync {
*/
async fn create_scope(&self, scope: &Scope) -> ResultRetry<bool>;

async fn list_streams(&self, scope: &Scope) -> ResultRetry<Vec<String>>;
/**
* API to list streams under a given scope and continuation token.
* Use the pravega_controller_client::paginator::list_streams to paginate over all the streams.
*/
async fn list_streams(
&self,
scope: &Scope,
token: &CToken,
) -> ResultRetry<Option<(Vec<ScopedStream>, CToken)>>;

/**
* API to delete a scope. Note that a scope can only be deleted in the case is it empty. If
Expand Down Expand Up @@ -326,8 +341,15 @@ impl ControllerClient for ControllerClientImpl {
)
}

async fn list_streams(&self, scope: &Scope) -> ResultRetry<Vec<String>> {
unimplemented!()
async fn list_streams(
&self,
scope: &Scope,
token: &CToken,
) -> ResultRetry<Option<(Vec<ScopedStream>, CToken)>> {
wrap_with_async_retry!(
self.config.retry_policy.max_tries(MAX_RETRIES),
self.call_list_streams(scope, token)
)
}

async fn delete_scope(&self, scope: &Scope) -> ResultRetry<bool> {
Expand Down Expand Up @@ -604,6 +626,59 @@ impl ControllerClientImpl {
})
}

async fn call_list_streams(
&self,
scope: &Scope,
token: &CToken,
) -> Result<Option<(Vec<ScopedStream>, CToken)>> {
let operation_name = "ListStreams";
let request: StreamsInScopeRequest = StreamsInScopeRequest {
scope: Some(ScopeInfo::from(scope)),
continuation_token: Some(ContinuationToken::from(token)),
};
debug!(
"Triggering a request to the controller to list streams for scope {}",
scope
);

let op_status: StdResult<tonic::Response<StreamsInScopeResponse>, tonic::Status> =
self.get_controller_client().list_streams_in_scope(request).await;
match op_status {
Ok(streams_with_token) => {
let result = streams_with_token.into_inner();
let mut t: Vec<StreamInfo> = result.streams;
if t.is_empty() {
// Empty result from the controller implies no further streams present.
Ok(None)
} else {
// update state with the new set of streams.
let stream_list: Vec<ScopedStream> = t.drain(..).map(|i| i.into()).collect();
let token: Option<ContinuationToken> = result.continuation_token;
match token.map(|t| t.token) {
None => {
warn!(
"None returned for continuation token list streams API for scope {}",
scope
);
Err(ControllerError::InvalidResponse {
can_retry: false,
error_msg: "No continuation token received from Controller".to_string(),
})
}
Some(ct) => {
debug!("Returned token {} for list streams API under scope {}", ct, scope);
Ok(Some((stream_list, CToken::from(ct.as_str()))))
}
}
}
}
Err(status) => {
debug!("Error {} while listing streams under scope {}", status, scope);
Err(self.map_grpc_error(operation_name, status).await)
}
}
}

async fn call_create_scope(&self, scope: &Scope) -> Result<bool> {
use create_scope_status::Status;
let operation_name = "CreateScope";
Expand Down Expand Up @@ -1393,6 +1468,18 @@ mod test {
.block_on(controller.get_or_refresh_delegation_token_for(scoped_stream))
.expect("get delegation token");
assert_eq!(res, "123".to_string());

// test list streams
let res = rt
.block_on(controller.list_streams(&scope, &CToken::empty()))
.expect("list streams");
let expected_stream = ScopedStream {
scope,
stream: Stream {
name: String::from("s1"),
},
};
assert_eq!(res, Some((vec![expected_stream], CToken::from("123"))))
}

#[derive(Default)]
Expand Down Expand Up @@ -1625,16 +1712,39 @@ mod test {
}
async fn list_streams_in_scope(
&self,
_request: Request<controller::StreamsInScopeRequest>,
request: Request<controller::StreamsInScopeRequest>,
) -> std::result::Result<Response<controller::StreamsInScopeResponse>, Status> {
let reply = controller::StreamsInScopeResponse {
streams: vec![],
continuation_token: Some(controller::ContinuationToken {
token: "123".to_string(),
}),
status: controller::streams_in_scope_response::Status::Success as i32,
let req = request.into_inner();
let s1 = ScopedStream {
scope: Scope {
name: req.scope.unwrap().scope,
},
stream: Stream {
name: String::from("s1"),
},
};
Ok(Response::new(reply))
let request_token: String = req.continuation_token.unwrap().token;
if request_token.is_empty() {
// first response
let reply = controller::StreamsInScopeResponse {
streams: vec![StreamInfo::from(&s1)],
continuation_token: Some(controller::ContinuationToken {
token: "123".to_string(),
}),
status: controller::streams_in_scope_response::Status::Success as i32,
};
Ok(Response::new(reply))
} else {
// subsequent response
let reply = controller::StreamsInScopeResponse {
streams: vec![],
continuation_token: Some(controller::ContinuationToken {
token: "123".to_string(),
}),
status: controller::streams_in_scope_response::Status::Success as i32,
};
Ok(Response::new(reply))
}
}
async fn remove_writer(
&self,
Expand Down
10 changes: 7 additions & 3 deletions controller-client/src/mock_controller.rs
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,11 @@ impl ControllerClient for MockController {
Ok(true)
}

async fn list_streams(&self, scope: &Scope) -> Result<Vec<String>, RetryError<ControllerError>> {
async fn list_streams(
&self,
scope: &Scope,
_token: &CToken,
) -> Result<Option<(Vec<ScopedStream>, CToken)>, RetryError<ControllerError>> {
let map_guard = self.created_scopes.read().await;
let streams_set = map_guard.get(&scope.name).ok_or(RetryError {
error: ControllerError::OperationError {
Expand All @@ -88,9 +92,9 @@ impl ControllerClient for MockController {
})?;
let mut result = Vec::new();
for stream in streams_set {
result.push(stream.stream.name.clone())
result.push(stream.clone())
}
Ok(result)
Ok(Some((result, CToken::from("mock_token"))))
}

async fn delete_scope(&self, scope: &Scope) -> Result<bool, RetryError<ControllerError>> {
Expand Down
17 changes: 17 additions & 0 deletions controller-client/src/model_helper.rs
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,23 @@ impl<'a> From<&'a ScopedStream> for StreamInfo {
}
}

impl From<StreamInfo> for ScopedStream {
fn from(value: StreamInfo) -> ScopedStream {
ScopedStream {
scope: Scope::from(value.scope),
stream: Stream::from(value.stream),
}
}
}

impl<'a> From<&'a CToken> for ContinuationToken {
fn from(t: &'a CToken) -> ContinuationToken {
ContinuationToken {
token: t.token.to_owned(),
}
}
}

impl<'a> From<&'a Scope> for ScopeInfo {
fn from(value: &'a Scope) -> ScopeInfo {
ScopeInfo {
Expand Down
Loading