diff --git a/.github/workflows/python_test.yml b/.github/workflows/python_test.yml index bd7fc149d..3e1647a2a 100644 --- a/.github/workflows/python_test.yml +++ b/.github/workflows/python_test.yml @@ -8,7 +8,6 @@ on: pull_request: branches: - master - - enable-tox name: pythontest diff --git a/controller-client/Cargo.toml b/controller-client/Cargo.toml index d3c0532b1..a989fc555 100644 --- a/controller-client/Cargo.toml +++ b/controller-client/Cargo.toml @@ -32,6 +32,7 @@ im = "15" tracing = "0.1" jsonwebtoken = "7" serde = {version = "1.0", features = ["derive"] } +futures = "0.3" [build-dependencies] tonic-build = "0.4" diff --git a/controller-client/src/cli.rs b/controller-client/src/cli.rs index 2e2b00978..6c9814b22 100644 --- a/controller-client/src/cli.rs +++ b/controller-client/src/cli.rs @@ -48,6 +48,11 @@ enum Command { #[structopt(help = "Stream Name")] stream_name: String, }, + /// List Streams under a scope + ListStreams { + #[structopt(help = "Scope Name")] + scope_name: String, + }, } #[derive(StructOpt, Debug)] @@ -124,5 +129,22 @@ fn main() { let result = rt.block_on(controller_client.delete_stream(&scoped_stream)); println!("Delete stream status {:?}", result); } + Command::ListStreams { scope_name } => { + use futures::future; + use futures::stream::StreamExt; + use pravega_controller_client::paginator::list_streams; + + let scope = Scope::from(scope_name.clone()); + let stream = list_streams(scope, &controller_client); + println!("Listing streams under scope {:?}", scope_name); + rt.block_on(stream.for_each(|stream| { + if stream.is_ok() { + println!("{:?}", stream.unwrap()); + } else { + println!("Error while fetching data from Controller. Details: {:?}", stream); + } + future::ready(()) + })); + } } } diff --git a/controller-client/src/lib.rs b/controller-client/src/lib.rs index 9c8326048..6bfd7dbfb 100644 --- a/controller-client/src/lib.rs +++ b/controller-client/src/lib.rs @@ -35,12 +35,12 @@ use async_trait::async_trait; use controller::{ controller_service_client::ControllerServiceClient, create_scope_status, create_stream_status, delete_scope_status, delete_stream_status, ping_txn_status, scale_request, scale_response, - scale_status_response, txn_state, txn_status, update_stream_status, CreateScopeStatus, + scale_status_response, txn_state, txn_status, update_stream_status, ContinuationToken, CreateScopeStatus, CreateStreamStatus, CreateTxnRequest, CreateTxnResponse, DelegationToken, DeleteScopeStatus, DeleteStreamStatus, GetEpochSegmentsRequest, GetSegmentsRequest, NodeUri, PingTxnRequest, PingTxnStatus, ScaleRequest, ScaleResponse, ScaleStatusRequest, ScaleStatusResponse, ScopeInfo, SegmentId, - SegmentRanges, SegmentsAtTime, StreamConfig, StreamInfo, SuccessorResponse, TxnId, TxnRequest, TxnState, - TxnStatus, UpdateStreamStatus, + SegmentRanges, SegmentsAtTime, StreamConfig, StreamInfo, StreamsInScopeRequest, StreamsInScopeResponse, + SuccessorResponse, TxnId, TxnRequest, TxnState, TxnStatus, UpdateStreamStatus, }; use im::{HashMap as ImHashMap, OrdMap}; use ordered_float::OrderedFloat; @@ -59,7 +59,7 @@ use tokio::runtime::Runtime; use tonic::codegen::http::uri::InvalidUri; use tonic::transport::{Certificate, Channel, ClientTlsConfig, Endpoint, Uri}; use tonic::{metadata::MetadataValue, Code, Request, Status}; -use tracing::{debug, info}; +use tracing::{debug, info, warn}; #[allow(non_camel_case_types)] pub mod controller { @@ -69,6 +69,7 @@ pub mod controller { pub mod mock_controller; mod model_helper; +pub mod paginator; // Max number of retries by the controller in case of a retryable failure. const MAX_RETRIES: i32 = 10; @@ -89,6 +90,8 @@ pub enum ControllerError { ConnectionError { can_retry: bool, error_msg: String }, #[snafu(display("Invalid configuration passed to the Controller client. Error {}", error_msg))] InvalidConfiguration { can_retry: bool, error_msg: String }, + #[snafu(display("Invalid response from the Controller. Error {}", error_msg))] + InvalidResponse { can_retry: bool, error_msg: String }, } // Implementation of Retryable trait for the error thrown by the Controller. @@ -110,6 +113,10 @@ impl Retryable for ControllerError { can_retry, error_msg: _, } => *can_retry, + InvalidResponse { + can_retry, + error_msg: _, + } => *can_retry, } } } @@ -128,7 +135,15 @@ pub trait ControllerClient: Send + Sync { */ async fn create_scope(&self, scope: &Scope) -> ResultRetry; - async fn list_streams(&self, scope: &Scope) -> ResultRetry>; + /** + * API to list streams under a given scope and continuation token. + * Use the pravega_controller_client::paginator::list_streams to paginate over all the streams. + */ + async fn list_streams( + &self, + scope: &Scope, + token: &CToken, + ) -> ResultRetry, CToken)>>; /** * API to delete a scope. Note that a scope can only be deleted in the case is it empty. If @@ -326,8 +341,15 @@ impl ControllerClient for ControllerClientImpl { ) } - async fn list_streams(&self, scope: &Scope) -> ResultRetry> { - unimplemented!() + async fn list_streams( + &self, + scope: &Scope, + token: &CToken, + ) -> ResultRetry, CToken)>> { + wrap_with_async_retry!( + self.config.retry_policy.max_tries(MAX_RETRIES), + self.call_list_streams(scope, token) + ) } async fn delete_scope(&self, scope: &Scope) -> ResultRetry { @@ -604,6 +626,59 @@ impl ControllerClientImpl { }) } + async fn call_list_streams( + &self, + scope: &Scope, + token: &CToken, + ) -> Result, CToken)>> { + let operation_name = "ListStreams"; + let request: StreamsInScopeRequest = StreamsInScopeRequest { + scope: Some(ScopeInfo::from(scope)), + continuation_token: Some(ContinuationToken::from(token)), + }; + debug!( + "Triggering a request to the controller to list streams for scope {}", + scope + ); + + let op_status: StdResult, tonic::Status> = + self.get_controller_client().list_streams_in_scope(request).await; + match op_status { + Ok(streams_with_token) => { + let result = streams_with_token.into_inner(); + let mut t: Vec = result.streams; + if t.is_empty() { + // Empty result from the controller implies no further streams present. + Ok(None) + } else { + // update state with the new set of streams. + let stream_list: Vec = t.drain(..).map(|i| i.into()).collect(); + let token: Option = result.continuation_token; + match token.map(|t| t.token) { + None => { + warn!( + "None returned for continuation token list streams API for scope {}", + scope + ); + Err(ControllerError::InvalidResponse { + can_retry: false, + error_msg: "No continuation token received from Controller".to_string(), + }) + } + Some(ct) => { + debug!("Returned token {} for list streams API under scope {}", ct, scope); + Ok(Some((stream_list, CToken::from(ct.as_str())))) + } + } + } + } + Err(status) => { + debug!("Error {} while listing streams under scope {}", status, scope); + Err(self.map_grpc_error(operation_name, status).await) + } + } + } + async fn call_create_scope(&self, scope: &Scope) -> Result { use create_scope_status::Status; let operation_name = "CreateScope"; @@ -1393,6 +1468,18 @@ mod test { .block_on(controller.get_or_refresh_delegation_token_for(scoped_stream)) .expect("get delegation token"); assert_eq!(res, "123".to_string()); + + // test list streams + let res = rt + .block_on(controller.list_streams(&scope, &CToken::empty())) + .expect("list streams"); + let expected_stream = ScopedStream { + scope, + stream: Stream { + name: String::from("s1"), + }, + }; + assert_eq!(res, Some((vec![expected_stream], CToken::from("123")))) } #[derive(Default)] @@ -1625,16 +1712,39 @@ mod test { } async fn list_streams_in_scope( &self, - _request: Request, + request: Request, ) -> std::result::Result, Status> { - let reply = controller::StreamsInScopeResponse { - streams: vec![], - continuation_token: Some(controller::ContinuationToken { - token: "123".to_string(), - }), - status: controller::streams_in_scope_response::Status::Success as i32, + let req = request.into_inner(); + let s1 = ScopedStream { + scope: Scope { + name: req.scope.unwrap().scope, + }, + stream: Stream { + name: String::from("s1"), + }, }; - Ok(Response::new(reply)) + let request_token: String = req.continuation_token.unwrap().token; + if request_token.is_empty() { + // first response + let reply = controller::StreamsInScopeResponse { + streams: vec![StreamInfo::from(&s1)], + continuation_token: Some(controller::ContinuationToken { + token: "123".to_string(), + }), + status: controller::streams_in_scope_response::Status::Success as i32, + }; + Ok(Response::new(reply)) + } else { + // subsequent response + let reply = controller::StreamsInScopeResponse { + streams: vec![], + continuation_token: Some(controller::ContinuationToken { + token: "123".to_string(), + }), + status: controller::streams_in_scope_response::Status::Success as i32, + }; + Ok(Response::new(reply)) + } } async fn remove_writer( &self, diff --git a/controller-client/src/mock_controller.rs b/controller-client/src/mock_controller.rs index 91c26660f..34c724e4f 100644 --- a/controller-client/src/mock_controller.rs +++ b/controller-client/src/mock_controller.rs @@ -75,7 +75,11 @@ impl ControllerClient for MockController { Ok(true) } - async fn list_streams(&self, scope: &Scope) -> Result, RetryError> { + async fn list_streams( + &self, + scope: &Scope, + _token: &CToken, + ) -> Result, CToken)>, RetryError> { let map_guard = self.created_scopes.read().await; let streams_set = map_guard.get(&scope.name).ok_or(RetryError { error: ControllerError::OperationError { @@ -88,9 +92,9 @@ impl ControllerClient for MockController { })?; let mut result = Vec::new(); for stream in streams_set { - result.push(stream.stream.name.clone()) + result.push(stream.clone()) } - Ok(result) + Ok(Some((result, CToken::from("mock_token")))) } async fn delete_scope(&self, scope: &Scope) -> Result> { diff --git a/controller-client/src/model_helper.rs b/controller-client/src/model_helper.rs index 64feeeaef..f2ce458ad 100644 --- a/controller-client/src/model_helper.rs +++ b/controller-client/src/model_helper.rs @@ -71,6 +71,23 @@ impl<'a> From<&'a ScopedStream> for StreamInfo { } } +impl From for ScopedStream { + fn from(value: StreamInfo) -> ScopedStream { + ScopedStream { + scope: Scope::from(value.scope), + stream: Stream::from(value.stream), + } + } +} + +impl<'a> From<&'a CToken> for ContinuationToken { + fn from(t: &'a CToken) -> ContinuationToken { + ContinuationToken { + token: t.token.to_owned(), + } + } +} + impl<'a> From<&'a Scope> for ScopeInfo { fn from(value: &'a Scope) -> ScopeInfo { ScopeInfo { diff --git a/controller-client/src/paginator.rs b/controller-client/src/paginator.rs new file mode 100644 index 000000000..21d1dbd51 --- /dev/null +++ b/controller-client/src/paginator.rs @@ -0,0 +1,131 @@ +/* + * Copyright (c) Dell Inc., or its subsidiaries. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + */ + +use crate::{ControllerClient, ControllerError, ResultRetry}; +use futures::prelude::*; +use futures::stream::{self}; +use pravega_client_retry::retry_result::RetryError; +use pravega_client_shared::{CToken, Scope, ScopedStream}; +use std::vec::IntoIter; +use tracing::error; +use tracing::info; + +/// +///Helper method to iterated over the all the Pravega streams under the provided Scope. +///This method returns a stream of values,Pravega streams, produced asynchronously. +/// +/// The below snippets show case the example uses. +/// Sample 1: +///```ignore +/// # futures::executor::block_on(async { +/// use pravega_client_shared::Scope; +/// use pravega_client_shared::ScopedStream; +/// use pravega_controller_client::paginator::list_streams; +/// use pravega_client::client_factory::ClientFactory; +/// use pravega_client_config::ClientConfigBuilder; +/// use pravega_client_config::MOCK_CONTROLLER_URI; +/// let config = ClientConfigBuilder::default() +/// .controller_uri(MOCK_CONTROLLER_URI) +/// .build() +/// .expect("creating config"); +/// let controller_client = ClientFactory::new(config).get_controller_client(); +/// let stream = list_streams( +/// Scope { +/// name: "testScope".to_string(), +/// }, +/// controller_client, +/// ); +/// // collect all the Streams in a single vector +/// let stream_list:Vec = stream.map(|str| str.unwrap()).collect::>().await; +/// # }); +/// ``` +/// +/// Sample 2: +/// ```ignore +/// # futures::executor::block_on(async { +/// use pravega_client_shared::Scope; +/// use pravega_controller_client::paginator::list_streams; +/// use pravega_client::client_factory::ClientFactory; +/// use pravega_client_config::ClientConfigBuilder; +/// use pravega_client_config::MOCK_CONTROLLER_URI; +/// use futures::StreamExt; +/// let config = ClientConfigBuilder::default() +/// .controller_uri(MOCK_CONTROLLER_URI) +/// .build() +/// .expect("creating config"); +/// let controller_client = ClientFactory::new(config).get_controller_client(); +/// let mut stream = list_streams( +/// Scope { +/// name: "testScope".to_string(), +/// }, +/// controller_client, +/// ); +/// let pravega_stream_1 = stream.next().await; +/// let pravega_stream_2 = stream.next().await; +/// // A None is returned at the end of the stream. +/// # }); +/// ``` +/// +pub fn list_streams( + scope: Scope, + client: &dyn ControllerClient, +) -> impl Stream>> + '_ { + struct State { + streams: IntoIter, + scope: Scope, + token: CToken, + } + + // Initial state with an empty Continuation token. + let get_next_stream_async = move |mut state: State| async move { + if let Some(element) = state.streams.next() { + Some((Ok(element), state)) + } else { + // execute a request to the controller. + info!( + "Fetch the next set of streams under scope {} using the provided token", + state.scope + ); + let res: ResultRetry, CToken)>> = + client.list_streams(&state.scope, &state.token).await; + match res { + Ok(None) => None, + Ok(Some((list, ct))) => { + // create a consuming iterator + let mut stream_iter = list.into_iter(); + Some(( + Ok(stream_iter.next()?), + State { + streams: stream_iter, + scope: state.scope.clone(), + token: ct, + }, + )) + } + Err(e) => { + //log an error and return None to indicate end of stream. + error!( + "Error while attempting to list streams for scope {}. Error: {:?}", + state.scope, e + ); + None + } + } + } + }; + stream::unfold( + State { + streams: Vec::new().into_iter(), + scope, + token: CToken::empty(), + }, + get_next_stream_async, + ) +} diff --git a/shared/src/lib.rs b/shared/src/lib.rs index df34eae74..b9122909b 100644 --- a/shared/src/lib.rs +++ b/shared/src/lib.rs @@ -189,6 +189,31 @@ impl From<&str> for ScopedStream { } } +/// +/// This represents the continuation token returned by the controller +/// as part of the list streams grpc API. +/// +#[derive(new, Debug, Clone, Hash, PartialEq, Eq, Serialize, Deserialize)] +pub struct CToken { + pub token: String, +} + +impl CToken { + pub fn empty() -> CToken { + CToken { + token: String::from(""), + } + } +} + +impl From<&str> for CToken { + fn from(string: &str) -> Self { + CToken { + token: string.to_string(), + } + } +} + #[derive(new, Debug, Clone, Hash, PartialEq, Eq, Serialize, Deserialize)] pub struct ScopedSegment { pub scope: Scope,