Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Issue 225: Implement list streams controller API #242

Merged
merged 16 commits into from
Apr 26, 2021
Prev Previous commit
Next Next commit
Add unit test.
Signed-off-by: Sandeep <[email protected]>
  • Loading branch information
shrids committed Apr 12, 2021
commit 61a0e435a4b03c7fc6f72ec714ec81a07228636c
48 changes: 40 additions & 8 deletions controller-client/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -677,6 +677,9 @@ impl ControllerClientImpl {
}
}
});
// Here stream_result has a stream of futures.
// In the below lines all the futures are computed and the complete list of streams is returned.
// TODO: Check if there a better way to return an async iterator back to the user.

let stream_list = stream_result
.collect::<Vec<StdResult<String, ControllerError>>>()
Expand Down Expand Up @@ -1485,6 +1488,12 @@ mod test {
.block_on(controller.get_or_refresh_delegation_token_for(scoped_stream))
.expect("get delegation token");
assert_eq!(res, "123".to_string());

// test list streams
let res = rt
.block_on(controller.list_streams(&scope))
.expect("list streams");
assert_eq!(res, vec!["s1".to_string()])
}

#[derive(Default)]
Expand Down Expand Up @@ -1717,16 +1726,39 @@ mod test {
}
async fn list_streams_in_scope(
&self,
_request: Request<controller::StreamsInScopeRequest>,
request: Request<controller::StreamsInScopeRequest>,
) -> std::result::Result<Response<controller::StreamsInScopeResponse>, Status> {
let reply = controller::StreamsInScopeResponse {
streams: vec![],
continuation_token: Some(controller::ContinuationToken {
token: "123".to_string(),
}),
status: controller::streams_in_scope_response::Status::Success as i32,
let req = request.into_inner();
let s1 = ScopedStream {
scope: Scope {
name: req.scope.unwrap().scope,
},
stream: Stream {
name: String::from("s1"),
},
};
Ok(Response::new(reply))
let request_token: String = req.continuation_token.unwrap().token;
if request_token.is_empty() {
// first response
let reply = controller::StreamsInScopeResponse {
streams: vec![StreamInfo::from(&s1)],
continuation_token: Some(controller::ContinuationToken {
token: "123".to_string(),
}),
status: controller::streams_in_scope_response::Status::Success as i32,
};
Ok(Response::new(reply))
} else {
// subsequent response
let reply = controller::StreamsInScopeResponse {
streams: vec![],
continuation_token: Some(controller::ContinuationToken {
token: "123".to_string(),
}),
status: controller::streams_in_scope_response::Status::Success as i32,
};
Ok(Response::new(reply))
}
}
async fn remove_writer(
&self,
Expand Down