Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Issue 83: Enable Python bindings for StreamManager and EventWriter. #104

Merged
merged 19 commits into from
Jun 3, 2020
Merged
Show file tree
Hide file tree
Changes from 9 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -19,3 +19,6 @@ pravega-*.tgz
.vscode
/shared/target/
.gradle
*.log

__pycache__/
4 changes: 2 additions & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ version = "0.1.0"
edition = "2018"
categories = ["Network programming"]
keywords = ["streaming", "client", "pravega"]
readme = "Readme.md"
readme = "README.md"
repository = "https://github.com/pravega/pravega-client-rust"
license = "Apache-2.0"
description = "A Rust client for Pravega. (Pravega.io)"
Expand All @@ -15,7 +15,7 @@ authors = ["Tom Kaitchuck <[email protected]>", "Wenqi Mou <wenqi.mou@dell.

[workspace]
members = [
"controller-client", "shared", "wire_protocol", "retry", "integration_test", "connection_pool", "channel",
"controller-client", "shared", "wire_protocol", "retry", "integration_test", "connection_pool", "channel", "bindings"
]

[dependencies]
Expand Down
2 changes: 1 addition & 1 deletion benches/benchmark.rs
Original file line number Diff line number Diff line change
Expand Up @@ -196,7 +196,7 @@ fn mock_connection_no_block(c: &mut Criterion) {
async fn set_up(config: ClientConfig) -> EventStreamWriter {
let scope_name = Scope::new("testWriterPerf".into());
let stream_name = Stream::new("testWriterPerf".into());
let client_factory = ClientFactory::new(config.clone());
let client_factory = ClientFactory::new(config.clone()).await;
let controller_client = client_factory.get_controller_client();
create_scope_stream(controller_client, &scope_name, &stream_name, 1).await;
let scoped_stream = ScopedStream {
Expand Down
36 changes: 36 additions & 0 deletions bindings/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
[package]
name = "bindings"
shrids marked this conversation as resolved.
Show resolved Hide resolved
version = "0.1.0"
edition = "2018"
categories = ["Network programming"]
keywords = ["streaming", "client", "pravega"]
readme = "Readme.md"
repository = "https://github.com/pravega/pravega-client-rust"
license = "Apache-2.0"
description = "An internal library used by the Rust client for Pravega to generated language bindings for Python and WASM."
authors = ["Tom Kaitchuck <[email protected]>", "Wenqi Mou <[email protected]>",
"Sandeep Shridhar <[email protected]>", "Wenxiao Zhang <[email protected]>"]

# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

[lib]
name = "pravega_client"
crate-type = ["cdylib"]

[dependencies.pyo3]
version = "0.10.1"
features = ["extension-module"]

[dependencies]
log = "0.4"
pravega-client-rust = { path = "../" }
pravega-wire-protocol = { path = "../wire_protocol"}
pravega-controller-client = { path = "../controller-client"}
pravega-rust-client-shared = { path = "../shared"}
pravega-rust-client-retry = {path = "../retry"}
pravega-connection-pool = {path= "../connection_pool"}
tokio = { version = "0.2.13", features = ["full"] }
lazy_static = "1.4.0"
uuid = {version = "0.8", features = ["v4"]}
futures = "0.3.5"
derive-new = "0.5"
3 changes: 3 additions & 0 deletions bindings/Readme.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
# Language Bindings

This provides a way to generate multiple language bindings to interact with Pravega.
3 changes: 3 additions & 0 deletions bindings/pyproject.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
[build-system]
requires = ["maturin"]
build-backend = "maturin"
13 changes: 13 additions & 0 deletions bindings/src/lib.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
//
// Copyright (c) Dell Inc., or its subsidiaries. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//

mod python_binding;
#[macro_use]
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can probably add a dummy main to keep the test suite happy.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

tried with a dummy main, it too fails. We need to use the workaround mentioined @ https://pyo3.rs/master/advanced.html#testing

extern crate derive_new;
36 changes: 36 additions & 0 deletions bindings/src/pravega_client_test.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
#
# Copyright (c) Dell Inc., or its subsidiaries. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#

import unittest
import pravega_client;

class PravegaTest(unittest.TestCase):
def test_writeEvent(self):
print("Creating a Stream Manager, ensure pravega is running")
stream_manager=pravega_client.StreamManager("127.0.0.1:9090")

print("Creating a scope")
scope_result=stream_manager.create_scope("testScope")
self.assertEqual(True, scope_result, "Scope creation status")

print("Creating a stream")
stream_result=stream_manager.create_stream("testScope", "testStream", 1)
self.assertEqual(True, stream_result, "Stream creation status")

print("Creating a writer for Stream")
w1=stream_manager.create_writer("testScope","testStream")

print("Write events")
w1.write_event("test event1")
w1.write_event("test event2")


if __name__ == '__main__':
unittest.main()
31 changes: 31 additions & 0 deletions bindings/src/python_binding/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
//
// Copyright (c) Dell Inc., or its subsidiaries. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//

use pyo3::prelude::*;
use pyo3::wrap_pyfunction;
mod stream_manager;
mod stream_writer;
use stream_manager::StreamManager;
use stream_writer::StreamWriter;

/// Formats the sum of two numbers as string.
#[pyfunction]
fn sum_as_string(a: usize, b: usize) -> PyResult<String> {
shrids marked this conversation as resolved.
Show resolved Hide resolved
Ok((a + b).to_string())
}

#[pymodule]
/// A Python module implemented in Rust.
fn pravega_client(_py: Python, m: &PyModule) -> PyResult<()> {
m.add_wrapped(wrap_pyfunction!(sum_as_string))?;
m.add_class::<StreamManager>()?;
m.add_class::<StreamWriter>()?;
Ok(())
}
188 changes: 188 additions & 0 deletions bindings/src/python_binding/stream_manager.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,188 @@
//
// Copyright (c) Dell Inc., or its subsidiaries. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//

use crate::python_binding::stream_writer::StreamWriter;
use pravega_client_rust::client_factory::ClientFactory;
use pravega_rust_client_shared::*;
use pravega_wire_protocol::client_config::{ClientConfig, ClientConfigBuilder};
use pyo3::exceptions;
use pyo3::prelude::*;
use pyo3::PyResult;
use std::net::SocketAddr;
use tokio::runtime::Runtime;

#[pyclass]
pub(crate) struct StreamManager {
_controller_ip: String,
rt: Runtime,
cf: ClientFactory,
config: ClientConfig,
}

#[pymethods]
impl StreamManager {
#[new]
fn new(controller_uri: String) -> Self {
let runtime = tokio::runtime::Runtime::new().expect("create runtime");
let handle = runtime.handle().clone();
let config = ClientConfigBuilder::default()
.controller_uri(
controller_uri
.parse::<SocketAddr>()
.expect("Parsing controller ip"),
)
.build()
.expect("creating config");
let client_factory = handle.block_on(ClientFactory::new(config.clone()));

StreamManager {
_controller_ip: controller_uri,
rt: runtime,
cf: client_factory,
config,
}
}

///
/// Create a Scope in Pravega.
///
pub fn create_scope(&self, scope_name: &str) -> PyResult<bool> {
let handle = self.rt.handle().clone();
println!("creating scope {:?}", scope_name);

let controller = self.cf.get_controller_client();
let scope_name = Scope::new(scope_name.to_string());

let scope_result = handle.block_on(controller.create_scope(&scope_name));
println!("Scope creation status {:?}", scope_result);
match scope_result {
Ok(t) => Ok(t),
Err(e) => Err(exceptions::ValueError::py_err(format!("{:?}", e))),
}
}

///
/// Delete a Scope in Pravega.
///
pub fn delete_scope(&self, scope_name: &str) -> PyResult<bool> {
let handle = self.rt.handle().clone();
println!("Delete scope {:?}", scope_name);

let controller = self.cf.get_controller_client();
let scope_name = Scope::new(scope_name.to_string());
// let stream_name = Stream::new("testStream".into());

let scope_result = handle.block_on(controller.delete_scope(&scope_name));
println!("Scope deletion status {:?}", scope_result);
match scope_result {
Ok(t) => Ok(t),
Err(e) => Err(exceptions::ValueError::py_err(format!("{:?}", e))),
}
}

///
/// Create a Stream in Pravega.
///
pub fn create_stream(
&self,
scope_name: &str,
stream_name: &str,
initial_segments: i32,
) -> PyResult<bool> {
let handle = self.rt.handle().clone();
println!(
"creating stream {:?} under scope {:?} with segment count {:?}",
stream_name, scope_name, initial_segments
);
let stream_cfg = StreamConfiguration {
scoped_stream: ScopedStream {
scope: Scope::new(scope_name.to_string()),
stream: Stream::new(stream_name.to_string()),
},
scaling: Scaling {
scale_type: ScaleType::FixedNumSegments,
target_rate: 0,
scale_factor: 0,
min_num_segments: initial_segments,
},
retention: Retention {
retention_type: RetentionType::None,
retention_param: 0,
},
};
let controller = self.cf.get_controller_client();

let stream_result = handle.block_on(controller.create_stream(&stream_cfg));
println!("Stream creation status {:?}", stream_result);
match stream_result {
Ok(t) => Ok(t),
Err(e) => Err(exceptions::ValueError::py_err(format!("{:?}", e))),
}
}

///
/// Create a Stream in Pravega.
///
pub fn seal_stream(&self, scope_name: &str, stream_name: &str) -> PyResult<bool> {
let handle = self.rt.handle().clone();
println!("Sealing stream {:?} under scope {:?} ", stream_name, scope_name);
let scoped_stream = ScopedStream {
scope: Scope::new(scope_name.to_string()),
stream: Stream::new(stream_name.to_string()),
};

let controller = self.cf.get_controller_client();

let stream_result = handle.block_on(controller.seal_stream(&scoped_stream));
println!("Sealing stream status {:?}", stream_result);
match stream_result {
Ok(t) => Ok(t),
Err(e) => Err(exceptions::ValueError::py_err(format!("{:?}", e))),
}
}

///
/// Delete a Stream in Pravega.
///
pub fn delete_stream(&self, scope_name: &str, stream_name: &str) -> PyResult<bool> {
let handle = self.rt.handle().clone();
println!("Deleting stream {:?} under scope {:?} ", stream_name, scope_name);
let scoped_stream = ScopedStream {
scope: Scope::new(scope_name.to_string()),
stream: Stream::new(stream_name.to_string()),
};

let controller = self.cf.get_controller_client();
let stream_result = handle.block_on(controller.delete_stream(&scoped_stream));
println!("Deleting stream status {:?}", stream_result);
match stream_result {
Ok(t) => Ok(t),
Err(e) => Err(exceptions::ValueError::py_err(format!("{:?}", e))),
}
}

///
/// Create a Writer for a given Stream.
///
pub fn create_writer(&self, scope_name: &str, stream_name: &str) -> PyResult<StreamWriter> {
let scoped_stream = ScopedStream {
scope: Scope::new(scope_name.to_string()),
stream: Stream::new(stream_name.to_string()),
};
let stream_writer = self.rt.handle().clone().block_on(async {
StreamWriter::new(
self.cf
.create_event_stream_writer(scoped_stream, self.config.clone()),
self.rt.handle().clone(),
)
});
Ok(stream_writer)
}
}
Loading