Skip to content
This repository was archived by the owner on Nov 6, 2020. It is now read-only.

fix(parity-clib): grumbles that were not addressed in #9920 #10154

Merged
merged 5 commits into from
Feb 11, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

102 changes: 26 additions & 76 deletions parity-clib/src/java.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,34 +15,31 @@
// along with Parity Ethereum. If not, see <http://www.gnu.org/licenses/>.

use std::{mem, ptr};
use std::ffi::c_void;
use std::sync::Arc;
use std::time::Duration;
use std::thread;
use std::os::raw::c_void;

use {parity_config_from_cli, parity_destroy, parity_set_logger, parity_start, parity_unsubscribe_ws, ParityParams, error};
use {Callback, parity_config_from_cli, parity_destroy, parity_rpc_worker, parity_start, parity_set_logger,
parity_unsubscribe_ws, parity_ws_worker, ParityParams};

use futures::{Future, Stream};
use futures::sync::mpsc;
use jni::{JavaVM, JNIEnv};
use jni::objects::{JClass, JString, JObject, JValue, GlobalRef};
use jni::sys::{jlong, jobjectArray, va_list};
use tokio_current_thread::CurrentThread;
use parity_ethereum::{RunningClient, PubSubSession};
use parity_ethereum::RunningClient;

type CheckedQuery<'a> = (&'a RunningClient, String, JavaVM, GlobalRef);

// Creates a Java callback to a static method named `void callback(Object)`
struct Callback<'a> {
struct JavaCallback<'a> {
jvm: JavaVM,
callback: GlobalRef,
method_name: &'a str,
method_descriptor: &'a str,
}

unsafe impl<'a> Send for Callback<'a> {}
unsafe impl<'a> Sync for Callback<'a> {}
impl<'a> Callback<'a> {
unsafe impl<'a> Send for JavaCallback<'a> {}
unsafe impl<'a> Sync for JavaCallback<'a> {}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It wasn't introduced by this PR, but could you explain why it's safe for GlobalRef?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't understand what you mean. Can you elaborate a bit?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry, I meant why is it safe to impl Sync and Send for JavaCallback?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That is a good question I have to admit I'm not completely sure (I more or less copy-pasted it from the lib.rs) but I argue that JavaVM and GlobalRef is never mutated and the Java code is thread-safe!

Also both JavaVM GlobalRef have unsafe Send + Sync impls but an inner-type GlobalRefGuard doesn't with the unsafe impls I get the following error:

53 | impl<'a> Callback for JavaCallback<'a> {
   |          ^^^^^^^^ `*mut jni::sys::_jobject` cannot be shared between threads safely
   |
   = help: within `jni::objects::global_ref::GlobalRefGuard`, the trait `std::marker::Sync` is not implemented for `*mut jni::sys::_jobject`
   = note: required because it appears within the type `jni::objects::JObject<'static>`
   = note: required because it appears within the type `jni::objects::global_ref::GlobalRefGuard`
   = note: required because of the requirements on the impl of `std::marker::Sync` for `std::sync::Arc<jni::objects::global_ref::GlobalRefGuard>`
   = note: required because it appears within the type `jni::objects::GlobalRef`
   = note: required because it appears within the type `java::JavaCallback<'a>`

Thoughts?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The reason I asked, is because the docs for GlobalRef 0.10.2 showed

impl !Sync for GlobalRef 

It seems like Sync was added later (jni-rs/jni-rs#145), and the implementation hasn't changed for GlobalRef.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks good point let’s ask them to bump the version so we can get rid of the unsafe part then :)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

cc @DarkEld3r


impl<'a> JavaCallback<'a> {
fn new(jvm: JavaVM, callback: GlobalRef) -> Self {
Self {
jvm,
Expand All @@ -51,7 +48,9 @@ impl<'a> Callback<'a> {
method_descriptor: "(Ljava/lang/Object;)V",
}
}
}

impl<'a> Callback for JavaCallback<'a> {
fn call(&self, msg: &str) {
let env = self.jvm.attach_current_thread().expect("JavaVM should have an environment; qed");
let java_str = env.new_string(msg.to_string()).expect("Rust String is valid JString; qed");
Expand All @@ -63,21 +62,21 @@ impl<'a> Callback<'a> {

#[no_mangle]
pub unsafe extern "system" fn Java_io_parity_ethereum_Parity_configFromCli(env: JNIEnv, _: JClass, cli: jobjectArray) -> jlong {
let cli_len = env.get_array_length(cli).expect("invalid Java bindings");
let cli_len = env.get_array_length(cli).expect("invalid Java bindings") as usize;

let mut jni_strings = Vec::with_capacity(cli_len as usize);
let mut opts = Vec::with_capacity(cli_len as usize);
let mut opts_lens = Vec::with_capacity(cli_len as usize);
let mut jni_strings = Vec::with_capacity(cli_len);
let mut opts = Vec::with_capacity(cli_len);
let mut opts_lens = Vec::with_capacity(cli_len);

for n in 0..cli_len {
for n in 0..cli_len as i32 {
let elem = env.get_object_array_element(cli, n).expect("invalid Java bindings");
let elem_str: JString = elem.into();
match env.get_string(elem_str) {
Ok(s) => {
opts.push(s.as_ptr());
opts_lens.push(s.to_bytes().len());
jni_strings.push(s);
},
}
Err(err) => {
let _ = env.throw_new("java/lang/Exception", err.to_string());
return 0
Expand All @@ -86,7 +85,7 @@ pub unsafe extern "system" fn Java_io_parity_ethereum_Parity_configFromCli(env:
}

let mut out = ptr::null_mut();
match parity_config_from_cli(opts.as_ptr(), opts_lens.as_ptr(), cli_len as usize, &mut out) {
match parity_config_from_cli(opts.as_ptr(), opts_lens.as_ptr(), cli_len, &mut out) {
0 => out as jlong,
_ => {
let _ = env.throw_new("java/lang/Exception", "failed to create config object");
Expand Down Expand Up @@ -120,7 +119,7 @@ pub unsafe extern "system" fn Java_io_parity_ethereum_Parity_build(
_ => {
let _ = env.throw_new("java/lang/Exception", "failed to start Parity");
0
},
}
}
}

Expand All @@ -129,7 +128,7 @@ pub unsafe extern "system" fn Java_io_parity_ethereum_Parity_destroy(_env: JNIEn
parity_destroy(parity);
}

unsafe fn async_checker<'a>(client: va_list, rpc: JString, callback: JObject, env: &JNIEnv<'a>)
unsafe fn java_query_checker<'a>(client: va_list, rpc: JString, callback: JObject, env: &JNIEnv<'a>)
-> Result<CheckedQuery<'a>, String> {
let query: String = env.get_string(rpc)
.map(Into::into)
Expand All @@ -151,26 +150,10 @@ pub unsafe extern "system" fn Java_io_parity_ethereum_Parity_rpcQueryNative(
callback: JObject,
)
{
let _ = async_checker(parity, rpc, callback, &env)
let _ = java_query_checker(parity, rpc, callback, &env)
.map(|(client, query, jvm, global_ref)| {
let callback = Arc::new(Callback::new(jvm, global_ref));
let cb = callback.clone();
let future = client.rpc_query(&query, None).map(move |response| {
let response = response.unwrap_or_else(|| error::EMPTY.to_string());
callback.call(&response);
});

let _handle = thread::Builder::new()
.name("rpc_query".to_string())
.spawn(move || {
let mut current_thread = CurrentThread::new();
current_thread.spawn(future);
let _ = current_thread.run_timeout(Duration::from_millis(timeout_ms as u64))
.map_err(|_e| {
cb.call(error::TIMEOUT);
});
})
.expect("rpc-query thread shouldn't fail; qed");
let callback = Arc::new(JavaCallback::new(jvm, global_ref));
parity_rpc_worker(client, &query, callback, timeout_ms as u64);
})
.map_err(|e| {
let _ = env.throw_new("java/lang/Exception", e);
Expand All @@ -186,43 +169,10 @@ pub unsafe extern "system" fn Java_io_parity_ethereum_Parity_subscribeWebSocketN
callback: JObject,
) -> va_list {

async_checker(parity, rpc, callback, &env)
java_query_checker(parity, rpc, callback, &env)
.map(move |(client, query, jvm, global_ref)| {
let callback = Arc::new(Callback::new(jvm, global_ref));
let (tx, mut rx) = mpsc::channel(1);
let session = Arc::new(PubSubSession::new(tx));
let weak_session = Arc::downgrade(&session);
let query_future = client.rpc_query(&query, Some(session.clone()));;

let _handle = thread::Builder::new()
.name("ws-subscriber".into())
.spawn(move || {
// Wait for subscription ID
// Note this may block forever and can't be destroyed using the session object
// However, this will likely timeout or be catched the RPC layer
if let Ok(Some(response)) = query_future.wait() {
callback.call(&response);
} else {
callback.call(error::SUBSCRIBE);
return;
};

loop {
for response in rx.by_ref().wait() {
if let Ok(r) = response {
callback.call(&r);
}
}

let rc = weak_session.upgrade().map_or(0,|session| Arc::strong_count(&session));
// No subscription left, then terminate
if rc <= 1 {
break;
}
}
})
.expect("rpc-subscriber thread shouldn't fail; qed");
Arc::into_raw(session) as va_list
let callback = Arc::new(JavaCallback::new(jvm, global_ref));
parity_ws_worker(client, &query, callback) as va_list
})
.unwrap_or_else(|e| {
let _ = env.throw_new("java/lang/Exception", e);
Expand Down
Loading