From 3e6efbd99546adfc546025e7373c99dddf706193 Mon Sep 17 00:00:00 2001 From: tison Date: Sun, 28 May 2023 16:32:07 +0800 Subject: [PATCH] feat(bindings/java): support append (#2350) * feat: improve bytes passing for write Signed-off-by: tison * feat: support append Signed-off-by: tison --------- Signed-off-by: tison --- bindings/java/pom.xml | 12 +++++ bindings/java/src/blocking_operator.rs | 10 ++-- .../org/apache/opendal/BlockingOperator.java | 7 ++- .../java/org/apache/opendal/Operator.java | 18 ++++++- bindings/java/src/operator.rs | 52 +++++++++++++++++-- .../org/apache/opendal/AsyncStepsTest.java | 7 +-- .../java/org/apache/opendal/OperatorTest.java | 49 ++++++++++++----- .../java/org/apache/opendal/StepsTest.java | 7 +-- 8 files changed, 130 insertions(+), 32 deletions(-) diff --git a/bindings/java/pom.xml b/bindings/java/pom.xml index 59934453f28f..69cb5c226216 100644 --- a/bindings/java/pom.xml +++ b/bindings/java/pom.xml @@ -56,6 +56,7 @@ ${os.detected.classifier} 3.23.1 + 1.18.26 1.0.0 3.0.0 @@ -84,10 +85,21 @@ assertj-core ${assertj-version} + + org.projectlombok + lombok + ${lombok.version} + + + org.projectlombok + lombok + provided + + org.questdb jar-jni diff --git a/bindings/java/src/blocking_operator.rs b/bindings/java/src/blocking_operator.rs index e8ca350489c3..68944b8b33b4 100644 --- a/bindings/java/src/blocking_operator.rs +++ b/bindings/java/src/blocking_operator.rs @@ -17,9 +17,9 @@ use std::str::FromStr; -use jni::objects::JClass; use jni::objects::JObject; use jni::objects::JString; +use jni::objects::{JByteArray, JClass}; use jni::sys::jlong; use jni::sys::jstring; use jni::JNIEnv; @@ -93,7 +93,7 @@ pub unsafe extern "system" fn Java_org_apache_opendal_BlockingOperator_write( _: JClass, op: *mut BlockingOperator, path: JString, - content: JString, + content: JByteArray, ) { intern_write(&mut env, &mut *op, path, content).unwrap_or_else(|e| { e.throw(&mut env); @@ -104,11 +104,11 @@ fn intern_write( env: &mut JNIEnv, op: &mut BlockingOperator, path: JString, - content: JString, + content: JByteArray, ) -> Result<()> { let path = env.get_string(&path)?; - let content = env.get_string(&content)?; - Ok(op.write(path.to_str()?, content.to_str()?.to_string())?) + let content = env.convert_byte_array(content)?; + Ok(op.write(path.to_str()?, content)?) } /// # Safety diff --git a/bindings/java/src/main/java/org/apache/opendal/BlockingOperator.java b/bindings/java/src/main/java/org/apache/opendal/BlockingOperator.java index f5d8ee9cae83..7413b2b044dd 100644 --- a/bindings/java/src/main/java/org/apache/opendal/BlockingOperator.java +++ b/bindings/java/src/main/java/org/apache/opendal/BlockingOperator.java @@ -19,6 +19,7 @@ package org.apache.opendal; +import java.nio.charset.StandardCharsets; import java.util.Map; /** @@ -41,6 +42,10 @@ public BlockingOperator(String schema, Map map) { } public void write(String path, String content) { + write(path, content.getBytes(StandardCharsets.UTF_8)); + } + + public void write(String path, byte[] content) { write(nativeHandle, path, content); } @@ -61,7 +66,7 @@ public Metadata stat(String path) { private static native long constructor(String schema, Map map); - private static native void write(long nativeHandle, String path, String content); + private static native void write(long nativeHandle, String path, byte[] content); private static native String read(long nativeHandle, String path); diff --git a/bindings/java/src/main/java/org/apache/opendal/Operator.java b/bindings/java/src/main/java/org/apache/opendal/Operator.java index 8bec89c284dc..288a9e12862d 100644 --- a/bindings/java/src/main/java/org/apache/opendal/Operator.java +++ b/bindings/java/src/main/java/org/apache/opendal/Operator.java @@ -19,6 +19,7 @@ package org.apache.opendal; +import java.nio.charset.StandardCharsets; import java.util.Map; import java.util.UUID; import java.util.concurrent.CompletableFuture; @@ -115,10 +116,23 @@ public Operator(String schema, Map map) { } public CompletableFuture write(String path, String content) { + return write(path, content.getBytes(StandardCharsets.UTF_8)); + } + + public CompletableFuture write(String path, byte[] content) { final long requestId = write(nativeHandle, path, content); return registry().take(requestId); } + public CompletableFuture append(String path, String content) { + return append(path, content.getBytes(StandardCharsets.UTF_8)); + } + + public CompletableFuture append(String path, byte[] content) { + final long requestId = append(nativeHandle, path, content); + return registry().take(requestId); + } + public CompletableFuture stat(String path) { final long requestId = stat(nativeHandle, path); final CompletableFuture f = registry().take(requestId); @@ -142,7 +156,9 @@ public CompletableFuture delete(String path) { private static native long read(long nativeHandle, String path); - private static native long write(long nativeHandle, String path, String content); + private static native long write(long nativeHandle, String path, byte[] content); + + private static native long append(long nativeHandle, String path, byte[] content); private static native long delete(long nativeHandle, String path); diff --git a/bindings/java/src/operator.rs b/bindings/java/src/operator.rs index b7e260019b35..3beb845c35d4 100644 --- a/bindings/java/src/operator.rs +++ b/bindings/java/src/operator.rs @@ -17,11 +17,11 @@ use std::str::FromStr; -use jni::objects::JClass; use jni::objects::JObject; use jni::objects::JString; use jni::objects::JValue; use jni::objects::JValueOwned; +use jni::objects::{JByteArray, JClass}; use jni::sys::jlong; use jni::JNIEnv; use opendal::Operator; @@ -73,7 +73,7 @@ pub unsafe extern "system" fn Java_org_apache_opendal_Operator_write( _: JClass, op: *mut Operator, path: JString, - content: JString, + content: JByteArray, ) -> jlong { intern_write(&mut env, op, path, content).unwrap_or_else(|e| { e.throw(&mut env); @@ -85,13 +85,13 @@ fn intern_write( env: &mut JNIEnv, op: *mut Operator, path: JString, - content: JString, + content: JByteArray, ) -> Result { let op = unsafe { &mut *op }; let id = request_id(env)?; let path = env.get_string(&path)?.to_str()?.to_string(); - let content = env.get_string(&content)?.to_str()?.to_string(); + let content = env.convert_byte_array(content)?; let runtime = unsafe { RUNTIME.get_unchecked() }; runtime.spawn(async move { @@ -102,10 +102,52 @@ fn intern_write( Ok(id) } -async fn do_write(op: &mut Operator, path: String, content: String) -> Result<()> { +async fn do_write(op: &mut Operator, path: String, content: Vec) -> Result<()> { Ok(op.write(&path, content).await?) } +/// # Safety +/// +/// This function should not be called before the Operator are ready. +#[no_mangle] +pub unsafe extern "system" fn Java_org_apache_opendal_Operator_append( + mut env: JNIEnv, + _: JClass, + op: *mut Operator, + path: JString, + content: JByteArray, +) -> jlong { + intern_append(&mut env, op, path, content).unwrap_or_else(|e| { + e.throw(&mut env); + 0 + }) +} + +fn intern_append( + env: &mut JNIEnv, + op: *mut Operator, + path: JString, + content: JByteArray, +) -> Result { + let op = unsafe { &mut *op }; + let id = request_id(env)?; + + let path = env.get_string(&path)?.to_str()?.to_string(); + let content = env.convert_byte_array(content)?; + + let runtime = unsafe { RUNTIME.get_unchecked() }; + runtime.spawn(async move { + let result = do_append(op, path, content).await; + complete_future(id, result.map(|_| JValueOwned::Void)) + }); + + Ok(id) +} + +async fn do_append(op: &mut Operator, path: String, content: Vec) -> Result<()> { + Ok(op.append(&path, content).await?) +} + /// # Safety /// /// This function should not be called before the Operator are ready. diff --git a/bindings/java/src/test/java/org/apache/opendal/AsyncStepsTest.java b/bindings/java/src/test/java/org/apache/opendal/AsyncStepsTest.java index 179d47b7b446..57a678cdc59b 100644 --- a/bindings/java/src/test/java/org/apache/opendal/AsyncStepsTest.java +++ b/bindings/java/src/test/java/org/apache/opendal/AsyncStepsTest.java @@ -27,6 +27,7 @@ import io.cucumber.java.en.When; import java.util.HashMap; import java.util.Map; +import lombok.Cleanup; public class AsyncStepsTest { Operator op; @@ -45,19 +46,19 @@ public void async_write_path_test_with_content_hello_world(String path, String c @Then("The async file {string} should exist") public void the_async_file_test_should_exist(String path) { - Metadata metadata = op.stat(path).join(); + @Cleanup Metadata metadata = op.stat(path).join(); assertNotNull(metadata); } @Then("The async file {string} entry mode must be file") public void the_async_file_test_entry_mode_must_be_file(String path) { - Metadata metadata = op.stat(path).join(); + @Cleanup Metadata metadata = op.stat(path).join(); assertTrue(metadata.isFile()); } @Then("The async file {string} content length must be {int}") public void the_async_file_test_content_length_must_be_13(String path, int length) { - Metadata metadata = op.stat(path).join(); + @Cleanup Metadata metadata = op.stat(path).join(); assertEquals(metadata.getContentLength(), length); } diff --git a/bindings/java/src/test/java/org/apache/opendal/OperatorTest.java b/bindings/java/src/test/java/org/apache/opendal/OperatorTest.java index c3085f5e7491..9e81e535e4ec 100644 --- a/bindings/java/src/test/java/org/apache/opendal/OperatorTest.java +++ b/bindings/java/src/test/java/org/apache/opendal/OperatorTest.java @@ -20,30 +20,26 @@ package org.apache.opendal; import static org.assertj.core.api.Assertions.assertThat; +import java.nio.file.Path; +import java.util.Arrays; import java.util.HashMap; import java.util.Map; import java.util.concurrent.CompletionException; -import org.junit.jupiter.api.AfterEach; -import org.junit.jupiter.api.BeforeEach; +import java.util.stream.Collectors; +import lombok.Cleanup; import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.io.TempDir; public class OperatorTest { - private Operator op; + @TempDir + private static Path tempDir; - @BeforeEach - public void init() { + @Test + public void testCreateAndDelete() { Map params = new HashMap<>(); params.put("root", "/tmp"); - this.op = new Operator("Memory", params); - } - - @AfterEach - public void clean() { - this.op.close(); - } + @Cleanup Operator op = new Operator("Memory", params); - @Test - public void testCreateAndDelete() { op.write("testCreateAndDelete", "Odin").join(); assertThat(op.read("testCreateAndDelete").join()).isEqualTo("Odin"); op.delete("testCreateAndDelete").join(); @@ -57,4 +53,29 @@ public void testCreateAndDelete() { }) .join(); } + + @Test + public void testAppendManyTimes() { + Map params = new HashMap<>(); + params.put("root", tempDir.toString()); + @Cleanup Operator op = new Operator("fs", params); + + String[] trunks = new String[] {"first trunk", "second trunk", "third trunk"}; + + for (int i = 0; i < trunks.length; i++) { + op.append("testAppendManyTimes", trunks[i]).join(); + String expected = Arrays.stream(trunks).limit(i + 1).collect(Collectors.joining()); + assertThat(op.read("testAppendManyTimes").join()).isEqualTo(expected); + } + + // write overwrite existing content + op.write("testAppendManyTimes", "new attempt").join(); + assertThat(op.read("testAppendManyTimes").join()).isEqualTo("new attempt"); + + for (int i = 0; i < trunks.length; i++) { + op.append("testAppendManyTimes", trunks[i]).join(); + String expected = Arrays.stream(trunks).limit(i + 1).collect(Collectors.joining()); + assertThat(op.read("testAppendManyTimes").join()).isEqualTo("new attempt" + expected); + } + } } diff --git a/bindings/java/src/test/java/org/apache/opendal/StepsTest.java b/bindings/java/src/test/java/org/apache/opendal/StepsTest.java index 428388ee9381..c72a0619d5ec 100644 --- a/bindings/java/src/test/java/org/apache/opendal/StepsTest.java +++ b/bindings/java/src/test/java/org/apache/opendal/StepsTest.java @@ -27,6 +27,7 @@ import io.cucumber.java.en.When; import java.util.HashMap; import java.util.Map; +import lombok.Cleanup; public class StepsTest { BlockingOperator op; @@ -45,19 +46,19 @@ public void blocking_write_path_test_with_content_hello_world(String path, Strin @Then("The blocking file {string} should exist") public void the_blocking_file_test_should_exist(String path) { - Metadata metadata = op.stat(path); + @Cleanup Metadata metadata = op.stat(path); assertNotNull(metadata); } @Then("The blocking file {string} entry mode must be file") public void the_blocking_file_test_entry_mode_must_be_file(String path) { - Metadata metadata = op.stat(path); + @Cleanup Metadata metadata = op.stat(path); assertTrue(metadata.isFile()); } @Then("The blocking file {string} content length must be {int}") public void the_blocking_file_test_content_length_must_be_13(String path, int length) { - Metadata metadata = op.stat(path); + @Cleanup Metadata metadata = op.stat(path); assertEquals(metadata.getContentLength(), length); }