Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Issue 225: Implement list streams controller API #242

Merged
merged 16 commits into from
Apr 26, 2021
1 change: 0 additions & 1 deletion .github/workflows/python_test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ on:
pull_request:
branches:
- master
- enable-tox

name: pythontest

Expand Down
1 change: 1 addition & 0 deletions controller-client/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ im = "15"
tracing = "0.1"
jsonwebtoken = "7"
serde = {version = "1.0", features = ["derive"] }
futures = "0.3"

[build-dependencies]
tonic-build = "0.4"
Expand Down
118 changes: 103 additions & 15 deletions controller-client/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,12 +35,12 @@ use async_trait::async_trait;
use controller::{
controller_service_client::ControllerServiceClient, create_scope_status, create_stream_status,
delete_scope_status, delete_stream_status, ping_txn_status, scale_request, scale_response,
scale_status_response, txn_state, txn_status, update_stream_status, CreateScopeStatus,
scale_status_response, txn_state, txn_status, update_stream_status, ContinuationToken, CreateScopeStatus,
CreateStreamStatus, CreateTxnRequest, CreateTxnResponse, DelegationToken, DeleteScopeStatus,
DeleteStreamStatus, GetEpochSegmentsRequest, GetSegmentsRequest, NodeUri, PingTxnRequest, PingTxnStatus,
ScaleRequest, ScaleResponse, ScaleStatusRequest, ScaleStatusResponse, ScopeInfo, SegmentId,
SegmentRanges, SegmentsAtTime, StreamConfig, StreamInfo, SuccessorResponse, TxnId, TxnRequest, TxnState,
TxnStatus, UpdateStreamStatus,
SegmentRanges, SegmentsAtTime, StreamConfig, StreamInfo, StreamsInScopeRequest, StreamsInScopeResponse,
SuccessorResponse, TxnId, TxnRequest, TxnState, TxnStatus, UpdateStreamStatus,
};
use im::{HashMap as ImHashMap, OrdMap};
use ordered_float::OrderedFloat;
Expand All @@ -59,7 +59,7 @@ use tokio::runtime::Runtime;
use tonic::codegen::http::uri::InvalidUri;
use tonic::transport::{Certificate, Channel, ClientTlsConfig, Endpoint, Uri};
use tonic::{metadata::MetadataValue, Code, Request, Status};
use tracing::{debug, info};
use tracing::{debug, info, warn};

#[allow(non_camel_case_types)]
pub mod controller {
Expand All @@ -69,6 +69,7 @@ pub mod controller {

pub mod mock_controller;
mod model_helper;
pub mod paginator;

// Max number of retries by the controller in case of a retryable failure.
const MAX_RETRIES: i32 = 10;
Expand Down Expand Up @@ -128,7 +129,11 @@ pub trait ControllerClient: Send + Sync {
*/
async fn create_scope(&self, scope: &Scope) -> ResultRetry<bool>;

async fn list_streams(&self, scope: &Scope) -> ResultRetry<Vec<String>>;
/**
* API to list streams under a given scope and continuation token.
* Use the pravega_controller_client::paginator::list_streams to paginate over all the streams.
*/
async fn list_streams(&self, scope: &Scope, token: &str) -> ResultRetry<Option<(Vec<String>, String)>>;

/**
* API to delete a scope. Note that a scope can only be deleted in the case is it empty. If
Expand Down Expand Up @@ -326,8 +331,11 @@ impl ControllerClient for ControllerClientImpl {
)
}

async fn list_streams(&self, scope: &Scope) -> ResultRetry<Vec<String>> {
unimplemented!()
async fn list_streams(&self, scope: &Scope, token: &str) -> ResultRetry<Option<(Vec<String>, String)>> {
wrap_with_async_retry!(
self.config.retry_policy.max_tries(MAX_RETRIES),
self.call_list_streams(scope, token)
)
}

async fn delete_scope(&self, scope: &Scope) -> ResultRetry<bool> {
Expand Down Expand Up @@ -604,6 +612,57 @@ impl ControllerClientImpl {
})
}

async fn call_list_streams(&self, scope: &Scope, token: &str) -> Result<Option<(Vec<String>, String)>> {
let operation_name = "ListStreams";
let request: StreamsInScopeRequest = StreamsInScopeRequest {
scope: Some(ScopeInfo::from(scope)),
continuation_token: Some(ContinuationToken {
token: token.to_string(),
}),
};
debug!(
"Triggering a request to the controller to list streams for scope {}",
scope
);

let op_status: StdResult<tonic::Response<StreamsInScopeResponse>, tonic::Status> =
self.get_controller_client().list_streams_in_scope(request).await;
match op_status {
Ok(streams_with_token) => {
let result = streams_with_token.into_inner();
let t: Vec<StreamInfo> = result.streams;
if t.is_empty() {
// Empty result from the controller implies no further streams present.
Ok(None)
} else {
// update state with the new set of streams.
let stream_list: Vec<String> =
t.iter().map(|i: &StreamInfo| i.stream.to_string()).collect();

let token: Option<ContinuationToken> = result.continuation_token;
let ct: String = match token.map(|t| t.token) {
None => {
warn!(
"None returned for continuation token list streams API for scope {}",
scope
);
String::from("")
}
Some(t) => {
debug!("Returned token {} for list streams API under scope {}", t, scope);
t
}
};
Ok(Some((stream_list, ct)))
}
}
Err(status) => {
debug!("Error {} while listing streams under scope {}", status, scope);
Err(self.map_grpc_error(operation_name, status).await)
}
}
}

async fn call_create_scope(&self, scope: &Scope) -> Result<bool> {
use create_scope_status::Status;
let operation_name = "CreateScope";
Expand Down Expand Up @@ -1393,6 +1452,12 @@ mod test {
.block_on(controller.get_or_refresh_delegation_token_for(scoped_stream))
.expect("get delegation token");
assert_eq!(res, "123".to_string());

// test list streams
let res = rt
.block_on(controller.list_streams(&scope, &String::from("")))
.expect("list streams");
assert_eq!(res, Some((vec!["s1".to_string()], String::from("123"))))
}

#[derive(Default)]
Expand Down Expand Up @@ -1625,16 +1690,39 @@ mod test {
}
async fn list_streams_in_scope(
&self,
_request: Request<controller::StreamsInScopeRequest>,
request: Request<controller::StreamsInScopeRequest>,
) -> std::result::Result<Response<controller::StreamsInScopeResponse>, Status> {
let reply = controller::StreamsInScopeResponse {
streams: vec![],
continuation_token: Some(controller::ContinuationToken {
token: "123".to_string(),
}),
status: controller::streams_in_scope_response::Status::Success as i32,
let req = request.into_inner();
let s1 = ScopedStream {
scope: Scope {
name: req.scope.unwrap().scope,
},
stream: Stream {
name: String::from("s1"),
},
};
Ok(Response::new(reply))
let request_token: String = req.continuation_token.unwrap().token;
if request_token.is_empty() {
// first response
let reply = controller::StreamsInScopeResponse {
streams: vec![StreamInfo::from(&s1)],
continuation_token: Some(controller::ContinuationToken {
token: "123".to_string(),
}),
status: controller::streams_in_scope_response::Status::Success as i32,
};
Ok(Response::new(reply))
} else {
// subsequent response
let reply = controller::StreamsInScopeResponse {
streams: vec![],
continuation_token: Some(controller::ContinuationToken {
token: "123".to_string(),
}),
status: controller::streams_in_scope_response::Status::Success as i32,
};
Ok(Response::new(reply))
}
}
async fn remove_writer(
&self,
Expand Down
8 changes: 6 additions & 2 deletions controller-client/src/mock_controller.rs
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,11 @@ impl ControllerClient for MockController {
Ok(true)
}

async fn list_streams(&self, scope: &Scope) -> Result<Vec<String>, RetryError<ControllerError>> {
async fn list_streams(
&self,
scope: &Scope,
_token: &str,
) -> Result<Option<(Vec<String>, String)>, RetryError<ControllerError>> {
let map_guard = self.created_scopes.read().await;
let streams_set = map_guard.get(&scope.name).ok_or(RetryError {
error: ControllerError::OperationError {
Expand All @@ -90,7 +94,7 @@ impl ControllerClient for MockController {
for stream in streams_set {
result.push(stream.stream.name.clone())
}
Ok(result)
Ok(Some((result, String::from(""))))
}

async fn delete_scope(&self, scope: &Scope) -> Result<bool, RetryError<ControllerError>> {
Expand Down
121 changes: 121 additions & 0 deletions controller-client/src/paginator.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,121 @@
/*
* Copyright (c) Dell Inc., or its subsidiaries. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*/

use crate::{ControllerClient, ControllerError, ResultRetry};
use futures::prelude::*;
use futures::stream::{self};
use pravega_client_retry::retry_result::RetryError;
use pravega_client_shared::Scope;
use std::vec::IntoIter;
use tracing::info;

///
///Helper method to iterated over the all the Pravega streams under the provided Scope.
///This method returns a stream of values,Pravega streams, produced asynchronously.
///
/// The below snippets show case the example uses.
/// Sample 1:
///```ignore
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you want ignore or no_run?

Copy link
Contributor Author

@shrids shrids Apr 22, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I used ignore since pravega_client module is not visible here

use pravega_client::client_factory::ClientFactory;
  |      ^^^^^^^^^^^^^^ use of undeclared crate or module `pravega_client`

Is there a way around it?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, yeah that a different crate. Sorry I don't think there is a way around that which doesn't involve adding a dev-dependency.

/// # futures::executor::block_on(async {
/// use pravega_client_shared::Scope;
/// use pravega_controller_client::paginator::list_streams;
/// use pravega_client::client_factory::ClientFactory;
/// use pravega_client_config::ClientConfigBuilder;
/// use pravega_client_config::MOCK_CONTROLLER_URI;
/// let config = ClientConfigBuilder::default()
/// .controller_uri(MOCK_CONTROLLER_URI)
/// .build()
/// .expect("creating config");
/// let controller_client = ClientFactory::new(config).get_controller_client();
/// let stream = list_streams(
/// Scope {
/// name: "testScope".to_string(),
/// },
/// controller_client,
/// );
/// // collect all the Streams in a single vector
/// let stream_list:Vec<String> = stream.map(|str| str.unwrap()).collect::<Vec<String>>().await;
/// # });
/// ```
///
/// Sample 2:
/// ```ignore
/// # futures::executor::block_on(async {
/// use pravega_client_shared::Scope;
/// use pravega_controller_client::paginator::list_streams;
/// use pravega_client::client_factory::ClientFactory;
/// use pravega_client_config::ClientConfigBuilder;
/// use pravega_client_config::MOCK_CONTROLLER_URI;
/// use futures::StreamExt;
/// let config = ClientConfigBuilder::default()
/// .controller_uri(MOCK_CONTROLLER_URI)
/// .build()
/// .expect("creating config");
/// let controller_client = ClientFactory::new(config).get_controller_client();
/// let mut stream = list_streams(
/// Scope {
/// name: "testScope".to_string(),
/// },
/// controller_client,
/// );
/// let pravega_stream_1 = stream.next().await;
/// let pravega_stream_2 = stream.next().await;
/// // A None is returned at the end of the stream.
/// # });
/// ```
///
pub fn list_streams(
scope: Scope,
client: &dyn ControllerClient,
) -> impl Stream<Item = Result<String, RetryError<ControllerError>>> + '_ {
struct State {
streams: IntoIter<String>,
scope: Scope,
token: String,
}

// Initial state with an empty Continuation token.
stream::unfold(
State {
streams: Vec::new().into_iter(),
scope,
token: String::from(""),
},
move |mut state| async move {
if let Some(element) = state.streams.next() {
Some((Ok(element), state))
} else {
// execute a request to the controller.
info!(
"Fetch the next set of streams under scope {} using the provided token",
state.scope
);
let res: ResultRetry<Option<(Vec<String>, String)>> =
client.list_streams(&state.scope, &state.token).await;
match res {
Ok(None) => None,
Ok(Some((list, ct))) => {
// create a consuming iterator
let mut stream_iter = list.into_iter();
Some((
Ok(stream_iter.next()?),
State {
streams: stream_iter,
scope: state.scope.clone(),
token: ct,
},
))
}
Err(e) => Some((Err(e), state)),
}
}
},
)
}