Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(bindings/java): save one jni call in the hot path #2353

Merged
merged 1 commit into from
May 28, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 11 additions & 14 deletions bindings/java/src/main/java/org/apache/opendal/Operator.java
Original file line number Diff line number Diff line change
Expand Up @@ -30,9 +30,6 @@
* accesses data asynchronously.
*/
public class Operator extends NativeObject {
private static AsyncRegistry registry() {
return AsyncRegistry.INSTANCE;
}

/**
* Singleton to hold all outstanding futures.
Expand Down Expand Up @@ -61,11 +58,11 @@ private enum AsyncRegistry {
* @return the request ID associated to the obtained future
*/
@SuppressWarnings("unused")
private long requestId() {
private static long requestId() {
final CompletableFuture<?> f = new CompletableFuture<>();
while (true) {
final long requestId = Math.abs(UUID.randomUUID().getLeastSignificantBits());
final CompletableFuture<?> prev = registry.putIfAbsent(requestId, f);
final CompletableFuture<?> prev = INSTANCE.registry.putIfAbsent(requestId, f);
if (prev == null) {
return requestId;
}
Expand All @@ -81,8 +78,8 @@ private long requestId() {
* @param requestId to identify the future
* @return the future associated with the request ID
*/
private CompletableFuture<?> get(long requestId) {
return registry.get(requestId);
private static CompletableFuture<?> get(long requestId) {
return INSTANCE.registry.get(requestId);
}

/**
Expand All @@ -92,10 +89,10 @@ private CompletableFuture<?> get(long requestId) {
* @return the future associated with the request ID
*/
@SuppressWarnings("unchecked")
private <T> CompletableFuture<T> take(long requestId) {
private static <T> CompletableFuture<T> take(long requestId) {
final CompletableFuture<?> f = get(requestId);
if (f != null) {
f.whenComplete((r, e) -> registry.remove(requestId));
f.whenComplete((r, e) -> INSTANCE.registry.remove(requestId));
}
return (CompletableFuture<T>) f;
}
Expand All @@ -121,7 +118,7 @@ public CompletableFuture<Void> write(String path, String content) {

public CompletableFuture<Void> write(String path, byte[] content) {
final long requestId = write(nativeHandle, path, content);
return registry().take(requestId);
return AsyncRegistry.take(requestId);
}

public CompletableFuture<Void> append(String path, String content) {
Expand All @@ -130,23 +127,23 @@ public CompletableFuture<Void> append(String path, String content) {

public CompletableFuture<Void> append(String path, byte[] content) {
final long requestId = append(nativeHandle, path, content);
return registry().take(requestId);
return AsyncRegistry.take(requestId);
}

public CompletableFuture<Metadata> stat(String path) {
final long requestId = stat(nativeHandle, path);
final CompletableFuture<Long> f = registry().take(requestId);
final CompletableFuture<Long> f = AsyncRegistry.take(requestId);
return f.thenApply(Metadata::new);
}

public CompletableFuture<String> read(String path) {
final long requestId = read(nativeHandle, path);
return registry().take(requestId);
return AsyncRegistry.take(requestId);
}

public CompletableFuture<Void> delete(String path) {
final long requestId = delete(nativeHandle, path);
return registry().take(requestId);
return AsyncRegistry.take(requestId);
}

@Override
Expand Down
31 changes: 11 additions & 20 deletions bindings/java/src/operator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -254,18 +254,6 @@ async fn do_delete(op: &mut Operator, path: String) -> Result<()> {
Ok(op.delete(&path).await?)
}

fn request_id(env: &mut JNIEnv) -> Result<jlong> {
let registry = env
.call_static_method(
"org/apache/opendal/Operator",
"registry",
"()Lorg/apache/opendal/Operator$AsyncRegistry;",
&[],
)?
.l()?;
Ok(env.call_method(registry, "requestId", "()J", &[])?.j()?)
}

fn make_object<'local>(
env: &mut JNIEnv<'local>,
value: JValueOwned<'local>,
Expand Down Expand Up @@ -312,18 +300,21 @@ fn complete_future(id: jlong, result: Result<JValueOwned>) {
};
}

fn get_future<'local>(env: &mut JNIEnv<'local>, id: jlong) -> Result<JObject<'local>> {
let registry = env
fn request_id(env: &mut JNIEnv) -> Result<jlong> {
Ok(env
.call_static_method(
"org/apache/opendal/Operator",
"registry",
"()Lorg/apache/opendal/Operator$AsyncRegistry;",
"org/apache/opendal/Operator$AsyncRegistry",
"requestId",
"()J",
&[],
)?
.l()?;
.j()?)
}

fn get_future<'local>(env: &mut JNIEnv<'local>, id: jlong) -> Result<JObject<'local>> {
Ok(env
.call_method(
registry,
.call_static_method(
"org/apache/opendal/Operator$AsyncRegistry",
"get",
"(J)Ljava/util/concurrent/CompletableFuture;",
&[JValue::Long(id)],
Expand Down