Skip to content

Commit

Permalink
Issue 225: Implement list streams controller API (#242)
Browse files Browse the repository at this point in the history
Enable List Streams Controller API on the Rust Client.
Create a Paginator utility which returns a impl Stream<Item = Result<String, RetryError<ControllerError>>> to handle huge stream lists with pagination.
Enable List Streams option in the Controller CLI

Signed-off-by: Sandeep <[email protected]>
  • Loading branch information
shrids authored Apr 26, 2021
1 parent fff2b36 commit a026f15
Show file tree
Hide file tree
Showing 8 changed files with 328 additions and 19 deletions.
1 change: 0 additions & 1 deletion .github/workflows/python_test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ on:
pull_request:
branches:
- master
- enable-tox

name: pythontest

Expand Down
1 change: 1 addition & 0 deletions controller-client/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ im = "15"
tracing = "0.1"
jsonwebtoken = "7"
serde = {version = "1.0", features = ["derive"] }
futures = "0.3"

[build-dependencies]
tonic-build = "0.4"
Expand Down
22 changes: 22 additions & 0 deletions controller-client/src/cli.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,11 @@ enum Command {
#[structopt(help = "Stream Name")]
stream_name: String,
},
/// List Streams under a scope
ListStreams {
#[structopt(help = "Scope Name")]
scope_name: String,
},
}

#[derive(StructOpt, Debug)]
Expand Down Expand Up @@ -124,5 +129,22 @@ fn main() {
let result = rt.block_on(controller_client.delete_stream(&scoped_stream));
println!("Delete stream status {:?}", result);
}
Command::ListStreams { scope_name } => {
use futures::future;
use futures::stream::StreamExt;
use pravega_controller_client::paginator::list_streams;

let scope = Scope::from(scope_name.clone());
let stream = list_streams(scope, &controller_client);
println!("Listing streams under scope {:?}", scope_name);
rt.block_on(stream.for_each(|stream| {
if stream.is_ok() {
println!("{:?}", stream.unwrap());
} else {
println!("Error while fetching data from Controller. Details: {:?}", stream);
}
future::ready(())
}));
}
}
}
140 changes: 125 additions & 15 deletions controller-client/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,12 +35,12 @@ use async_trait::async_trait;
use controller::{
controller_service_client::ControllerServiceClient, create_scope_status, create_stream_status,
delete_scope_status, delete_stream_status, ping_txn_status, scale_request, scale_response,
scale_status_response, txn_state, txn_status, update_stream_status, CreateScopeStatus,
scale_status_response, txn_state, txn_status, update_stream_status, ContinuationToken, CreateScopeStatus,
CreateStreamStatus, CreateTxnRequest, CreateTxnResponse, DelegationToken, DeleteScopeStatus,
DeleteStreamStatus, GetEpochSegmentsRequest, GetSegmentsRequest, NodeUri, PingTxnRequest, PingTxnStatus,
ScaleRequest, ScaleResponse, ScaleStatusRequest, ScaleStatusResponse, ScopeInfo, SegmentId,
SegmentRanges, SegmentsAtTime, StreamConfig, StreamInfo, SuccessorResponse, TxnId, TxnRequest, TxnState,
TxnStatus, UpdateStreamStatus,
SegmentRanges, SegmentsAtTime, StreamConfig, StreamInfo, StreamsInScopeRequest, StreamsInScopeResponse,
SuccessorResponse, TxnId, TxnRequest, TxnState, TxnStatus, UpdateStreamStatus,
};
use im::{HashMap as ImHashMap, OrdMap};
use ordered_float::OrderedFloat;
Expand All @@ -59,7 +59,7 @@ use tokio::runtime::Runtime;
use tonic::codegen::http::uri::InvalidUri;
use tonic::transport::{Certificate, Channel, ClientTlsConfig, Endpoint, Uri};
use tonic::{metadata::MetadataValue, Code, Request, Status};
use tracing::{debug, info};
use tracing::{debug, info, warn};

#[allow(non_camel_case_types)]
pub mod controller {
Expand All @@ -69,6 +69,7 @@ pub mod controller {

pub mod mock_controller;
mod model_helper;
pub mod paginator;

// Max number of retries by the controller in case of a retryable failure.
const MAX_RETRIES: i32 = 10;
Expand All @@ -89,6 +90,8 @@ pub enum ControllerError {
ConnectionError { can_retry: bool, error_msg: String },
#[snafu(display("Invalid configuration passed to the Controller client. Error {}", error_msg))]
InvalidConfiguration { can_retry: bool, error_msg: String },
#[snafu(display("Invalid response from the Controller. Error {}", error_msg))]
InvalidResponse { can_retry: bool, error_msg: String },
}

// Implementation of Retryable trait for the error thrown by the Controller.
Expand All @@ -110,6 +113,10 @@ impl Retryable for ControllerError {
can_retry,
error_msg: _,
} => *can_retry,
InvalidResponse {
can_retry,
error_msg: _,
} => *can_retry,
}
}
}
Expand All @@ -128,7 +135,15 @@ pub trait ControllerClient: Send + Sync {
*/
async fn create_scope(&self, scope: &Scope) -> ResultRetry<bool>;

async fn list_streams(&self, scope: &Scope) -> ResultRetry<Vec<String>>;
/**
* API to list streams under a given scope and continuation token.
* Use the pravega_controller_client::paginator::list_streams to paginate over all the streams.
*/
async fn list_streams(
&self,
scope: &Scope,
token: &CToken,
) -> ResultRetry<Option<(Vec<ScopedStream>, CToken)>>;

/**
* API to delete a scope. Note that a scope can only be deleted in the case is it empty. If
Expand Down Expand Up @@ -326,8 +341,15 @@ impl ControllerClient for ControllerClientImpl {
)
}

async fn list_streams(&self, scope: &Scope) -> ResultRetry<Vec<String>> {
unimplemented!()
async fn list_streams(
&self,
scope: &Scope,
token: &CToken,
) -> ResultRetry<Option<(Vec<ScopedStream>, CToken)>> {
wrap_with_async_retry!(
self.config.retry_policy.max_tries(MAX_RETRIES),
self.call_list_streams(scope, token)
)
}

async fn delete_scope(&self, scope: &Scope) -> ResultRetry<bool> {
Expand Down Expand Up @@ -604,6 +626,59 @@ impl ControllerClientImpl {
})
}

async fn call_list_streams(
&self,
scope: &Scope,
token: &CToken,
) -> Result<Option<(Vec<ScopedStream>, CToken)>> {
let operation_name = "ListStreams";
let request: StreamsInScopeRequest = StreamsInScopeRequest {
scope: Some(ScopeInfo::from(scope)),
continuation_token: Some(ContinuationToken::from(token)),
};
debug!(
"Triggering a request to the controller to list streams for scope {}",
scope
);

let op_status: StdResult<tonic::Response<StreamsInScopeResponse>, tonic::Status> =
self.get_controller_client().list_streams_in_scope(request).await;
match op_status {
Ok(streams_with_token) => {
let result = streams_with_token.into_inner();
let mut t: Vec<StreamInfo> = result.streams;
if t.is_empty() {
// Empty result from the controller implies no further streams present.
Ok(None)
} else {
// update state with the new set of streams.
let stream_list: Vec<ScopedStream> = t.drain(..).map(|i| i.into()).collect();
let token: Option<ContinuationToken> = result.continuation_token;
match token.map(|t| t.token) {
None => {
warn!(
"None returned for continuation token list streams API for scope {}",
scope
);
Err(ControllerError::InvalidResponse {
can_retry: false,
error_msg: "No continuation token received from Controller".to_string(),
})
}
Some(ct) => {
debug!("Returned token {} for list streams API under scope {}", ct, scope);
Ok(Some((stream_list, CToken::from(ct.as_str()))))
}
}
}
}
Err(status) => {
debug!("Error {} while listing streams under scope {}", status, scope);
Err(self.map_grpc_error(operation_name, status).await)
}
}
}

async fn call_create_scope(&self, scope: &Scope) -> Result<bool> {
use create_scope_status::Status;
let operation_name = "CreateScope";
Expand Down Expand Up @@ -1393,6 +1468,18 @@ mod test {
.block_on(controller.get_or_refresh_delegation_token_for(scoped_stream))
.expect("get delegation token");
assert_eq!(res, "123".to_string());

// test list streams
let res = rt
.block_on(controller.list_streams(&scope, &CToken::empty()))
.expect("list streams");
let expected_stream = ScopedStream {
scope,
stream: Stream {
name: String::from("s1"),
},
};
assert_eq!(res, Some((vec![expected_stream], CToken::from("123"))))
}

#[derive(Default)]
Expand Down Expand Up @@ -1625,16 +1712,39 @@ mod test {
}
async fn list_streams_in_scope(
&self,
_request: Request<controller::StreamsInScopeRequest>,
request: Request<controller::StreamsInScopeRequest>,
) -> std::result::Result<Response<controller::StreamsInScopeResponse>, Status> {
let reply = controller::StreamsInScopeResponse {
streams: vec![],
continuation_token: Some(controller::ContinuationToken {
token: "123".to_string(),
}),
status: controller::streams_in_scope_response::Status::Success as i32,
let req = request.into_inner();
let s1 = ScopedStream {
scope: Scope {
name: req.scope.unwrap().scope,
},
stream: Stream {
name: String::from("s1"),
},
};
Ok(Response::new(reply))
let request_token: String = req.continuation_token.unwrap().token;
if request_token.is_empty() {
// first response
let reply = controller::StreamsInScopeResponse {
streams: vec![StreamInfo::from(&s1)],
continuation_token: Some(controller::ContinuationToken {
token: "123".to_string(),
}),
status: controller::streams_in_scope_response::Status::Success as i32,
};
Ok(Response::new(reply))
} else {
// subsequent response
let reply = controller::StreamsInScopeResponse {
streams: vec![],
continuation_token: Some(controller::ContinuationToken {
token: "123".to_string(),
}),
status: controller::streams_in_scope_response::Status::Success as i32,
};
Ok(Response::new(reply))
}
}
async fn remove_writer(
&self,
Expand Down
10 changes: 7 additions & 3 deletions controller-client/src/mock_controller.rs
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,11 @@ impl ControllerClient for MockController {
Ok(true)
}

async fn list_streams(&self, scope: &Scope) -> Result<Vec<String>, RetryError<ControllerError>> {
async fn list_streams(
&self,
scope: &Scope,
_token: &CToken,
) -> Result<Option<(Vec<ScopedStream>, CToken)>, RetryError<ControllerError>> {
let map_guard = self.created_scopes.read().await;
let streams_set = map_guard.get(&scope.name).ok_or(RetryError {
error: ControllerError::OperationError {
Expand All @@ -88,9 +92,9 @@ impl ControllerClient for MockController {
})?;
let mut result = Vec::new();
for stream in streams_set {
result.push(stream.stream.name.clone())
result.push(stream.clone())
}
Ok(result)
Ok(Some((result, CToken::from("mock_token"))))
}

async fn delete_scope(&self, scope: &Scope) -> Result<bool, RetryError<ControllerError>> {
Expand Down
17 changes: 17 additions & 0 deletions controller-client/src/model_helper.rs
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,23 @@ impl<'a> From<&'a ScopedStream> for StreamInfo {
}
}

impl From<StreamInfo> for ScopedStream {
fn from(value: StreamInfo) -> ScopedStream {
ScopedStream {
scope: Scope::from(value.scope),
stream: Stream::from(value.stream),
}
}
}

impl<'a> From<&'a CToken> for ContinuationToken {
fn from(t: &'a CToken) -> ContinuationToken {
ContinuationToken {
token: t.token.to_owned(),
}
}
}

impl<'a> From<&'a Scope> for ScopeInfo {
fn from(value: &'a Scope) -> ScopeInfo {
ScopeInfo {
Expand Down
Loading

0 comments on commit a026f15

Please sign in to comment.