Skip to content

Commit

Permalink
feat(bindings/java): support append (#2350)
Browse files Browse the repository at this point in the history
* feat: improve bytes passing for write

Signed-off-by: tison <[email protected]>

* feat: support append

Signed-off-by: tison <[email protected]>

---------

Signed-off-by: tison <[email protected]>
  • Loading branch information
tisonkun authored May 28, 2023
1 parent bd75a75 commit 3e6efbd
Show file tree
Hide file tree
Showing 8 changed files with 130 additions and 32 deletions.
12 changes: 12 additions & 0 deletions bindings/java/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@
<jniClassifier>${os.detected.classifier}</jniClassifier>

<assertj-version>3.23.1</assertj-version>
<lombok.version>1.18.26</lombok.version>
<questdb.version>1.0.0</questdb.version>

<maven-surefire-plugin.version>3.0.0</maven-surefire-plugin.version>
Expand Down Expand Up @@ -84,10 +85,21 @@
<artifactId>assertj-core</artifactId>
<version>${assertj-version}</version>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>${lombok.version}</version>
</dependency>
</dependencies>
</dependencyManagement>

<dependencies>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<scope>provided</scope>
</dependency>

<dependency>
<groupId>org.questdb</groupId>
<artifactId>jar-jni</artifactId>
Expand Down
10 changes: 5 additions & 5 deletions bindings/java/src/blocking_operator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,9 @@

use std::str::FromStr;

use jni::objects::JClass;
use jni::objects::JObject;
use jni::objects::JString;
use jni::objects::{JByteArray, JClass};
use jni::sys::jlong;
use jni::sys::jstring;
use jni::JNIEnv;
Expand Down Expand Up @@ -93,7 +93,7 @@ pub unsafe extern "system" fn Java_org_apache_opendal_BlockingOperator_write(
_: JClass,
op: *mut BlockingOperator,
path: JString,
content: JString,
content: JByteArray,
) {
intern_write(&mut env, &mut *op, path, content).unwrap_or_else(|e| {
e.throw(&mut env);
Expand All @@ -104,11 +104,11 @@ fn intern_write(
env: &mut JNIEnv,
op: &mut BlockingOperator,
path: JString,
content: JString,
content: JByteArray,
) -> Result<()> {
let path = env.get_string(&path)?;
let content = env.get_string(&content)?;
Ok(op.write(path.to_str()?, content.to_str()?.to_string())?)
let content = env.convert_byte_array(content)?;
Ok(op.write(path.to_str()?, content)?)
}

/// # Safety
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

package org.apache.opendal;

import java.nio.charset.StandardCharsets;
import java.util.Map;

/**
Expand All @@ -41,6 +42,10 @@ public BlockingOperator(String schema, Map<String, String> map) {
}

public void write(String path, String content) {
write(path, content.getBytes(StandardCharsets.UTF_8));
}

public void write(String path, byte[] content) {
write(nativeHandle, path, content);
}

Expand All @@ -61,7 +66,7 @@ public Metadata stat(String path) {

private static native long constructor(String schema, Map<String, String> map);

private static native void write(long nativeHandle, String path, String content);
private static native void write(long nativeHandle, String path, byte[] content);

private static native String read(long nativeHandle, String path);

Expand Down
18 changes: 17 additions & 1 deletion bindings/java/src/main/java/org/apache/opendal/Operator.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

package org.apache.opendal;

import java.nio.charset.StandardCharsets;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
Expand Down Expand Up @@ -115,10 +116,23 @@ public Operator(String schema, Map<String, String> map) {
}

public CompletableFuture<Void> write(String path, String content) {
return write(path, content.getBytes(StandardCharsets.UTF_8));
}

public CompletableFuture<Void> write(String path, byte[] content) {
final long requestId = write(nativeHandle, path, content);
return registry().take(requestId);
}

public CompletableFuture<Void> append(String path, String content) {
return append(path, content.getBytes(StandardCharsets.UTF_8));
}

public CompletableFuture<Void> append(String path, byte[] content) {
final long requestId = append(nativeHandle, path, content);
return registry().take(requestId);
}

public CompletableFuture<Metadata> stat(String path) {
final long requestId = stat(nativeHandle, path);
final CompletableFuture<Long> f = registry().take(requestId);
Expand All @@ -142,7 +156,9 @@ public CompletableFuture<Void> delete(String path) {

private static native long read(long nativeHandle, String path);

private static native long write(long nativeHandle, String path, String content);
private static native long write(long nativeHandle, String path, byte[] content);

private static native long append(long nativeHandle, String path, byte[] content);

private static native long delete(long nativeHandle, String path);

Expand Down
52 changes: 47 additions & 5 deletions bindings/java/src/operator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,11 @@

use std::str::FromStr;

use jni::objects::JClass;
use jni::objects::JObject;
use jni::objects::JString;
use jni::objects::JValue;
use jni::objects::JValueOwned;
use jni::objects::{JByteArray, JClass};
use jni::sys::jlong;
use jni::JNIEnv;
use opendal::Operator;
Expand Down Expand Up @@ -73,7 +73,7 @@ pub unsafe extern "system" fn Java_org_apache_opendal_Operator_write(
_: JClass,
op: *mut Operator,
path: JString,
content: JString,
content: JByteArray,
) -> jlong {
intern_write(&mut env, op, path, content).unwrap_or_else(|e| {
e.throw(&mut env);
Expand All @@ -85,13 +85,13 @@ fn intern_write(
env: &mut JNIEnv,
op: *mut Operator,
path: JString,
content: JString,
content: JByteArray,
) -> Result<jlong> {
let op = unsafe { &mut *op };
let id = request_id(env)?;

let path = env.get_string(&path)?.to_str()?.to_string();
let content = env.get_string(&content)?.to_str()?.to_string();
let content = env.convert_byte_array(content)?;

let runtime = unsafe { RUNTIME.get_unchecked() };
runtime.spawn(async move {
Expand All @@ -102,10 +102,52 @@ fn intern_write(
Ok(id)
}

async fn do_write(op: &mut Operator, path: String, content: String) -> Result<()> {
async fn do_write(op: &mut Operator, path: String, content: Vec<u8>) -> Result<()> {
Ok(op.write(&path, content).await?)
}

/// # Safety
///
/// This function should not be called before the Operator are ready.
#[no_mangle]
pub unsafe extern "system" fn Java_org_apache_opendal_Operator_append(
mut env: JNIEnv,
_: JClass,
op: *mut Operator,
path: JString,
content: JByteArray,
) -> jlong {
intern_append(&mut env, op, path, content).unwrap_or_else(|e| {
e.throw(&mut env);
0
})
}

fn intern_append(
env: &mut JNIEnv,
op: *mut Operator,
path: JString,
content: JByteArray,
) -> Result<jlong> {
let op = unsafe { &mut *op };
let id = request_id(env)?;

let path = env.get_string(&path)?.to_str()?.to_string();
let content = env.convert_byte_array(content)?;

let runtime = unsafe { RUNTIME.get_unchecked() };
runtime.spawn(async move {
let result = do_append(op, path, content).await;
complete_future(id, result.map(|_| JValueOwned::Void))
});

Ok(id)
}

async fn do_append(op: &mut Operator, path: String, content: Vec<u8>) -> Result<()> {
Ok(op.append(&path, content).await?)
}

/// # Safety
///
/// This function should not be called before the Operator are ready.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import io.cucumber.java.en.When;
import java.util.HashMap;
import java.util.Map;
import lombok.Cleanup;

public class AsyncStepsTest {
Operator op;
Expand All @@ -45,19 +46,19 @@ public void async_write_path_test_with_content_hello_world(String path, String c

@Then("The async file {string} should exist")
public void the_async_file_test_should_exist(String path) {
Metadata metadata = op.stat(path).join();
@Cleanup Metadata metadata = op.stat(path).join();
assertNotNull(metadata);
}

@Then("The async file {string} entry mode must be file")
public void the_async_file_test_entry_mode_must_be_file(String path) {
Metadata metadata = op.stat(path).join();
@Cleanup Metadata metadata = op.stat(path).join();
assertTrue(metadata.isFile());
}

@Then("The async file {string} content length must be {int}")
public void the_async_file_test_content_length_must_be_13(String path, int length) {
Metadata metadata = op.stat(path).join();
@Cleanup Metadata metadata = op.stat(path).join();
assertEquals(metadata.getContentLength(), length);
}

Expand Down
49 changes: 35 additions & 14 deletions bindings/java/src/test/java/org/apache/opendal/OperatorTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,30 +20,26 @@
package org.apache.opendal;

import static org.assertj.core.api.Assertions.assertThat;
import java.nio.file.Path;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.CompletionException;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import java.util.stream.Collectors;
import lombok.Cleanup;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;

public class OperatorTest {
private Operator op;
@TempDir
private static Path tempDir;

@BeforeEach
public void init() {
@Test
public void testCreateAndDelete() {
Map<String, String> params = new HashMap<>();
params.put("root", "/tmp");
this.op = new Operator("Memory", params);
}

@AfterEach
public void clean() {
this.op.close();
}
@Cleanup Operator op = new Operator("Memory", params);

@Test
public void testCreateAndDelete() {
op.write("testCreateAndDelete", "Odin").join();
assertThat(op.read("testCreateAndDelete").join()).isEqualTo("Odin");
op.delete("testCreateAndDelete").join();
Expand All @@ -57,4 +53,29 @@ public void testCreateAndDelete() {
})
.join();
}

@Test
public void testAppendManyTimes() {
Map<String, String> params = new HashMap<>();
params.put("root", tempDir.toString());
@Cleanup Operator op = new Operator("fs", params);

String[] trunks = new String[] {"first trunk", "second trunk", "third trunk"};

for (int i = 0; i < trunks.length; i++) {
op.append("testAppendManyTimes", trunks[i]).join();
String expected = Arrays.stream(trunks).limit(i + 1).collect(Collectors.joining());
assertThat(op.read("testAppendManyTimes").join()).isEqualTo(expected);
}

// write overwrite existing content
op.write("testAppendManyTimes", "new attempt").join();
assertThat(op.read("testAppendManyTimes").join()).isEqualTo("new attempt");

for (int i = 0; i < trunks.length; i++) {
op.append("testAppendManyTimes", trunks[i]).join();
String expected = Arrays.stream(trunks).limit(i + 1).collect(Collectors.joining());
assertThat(op.read("testAppendManyTimes").join()).isEqualTo("new attempt" + expected);
}
}
}
7 changes: 4 additions & 3 deletions bindings/java/src/test/java/org/apache/opendal/StepsTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import io.cucumber.java.en.When;
import java.util.HashMap;
import java.util.Map;
import lombok.Cleanup;

public class StepsTest {
BlockingOperator op;
Expand All @@ -45,19 +46,19 @@ public void blocking_write_path_test_with_content_hello_world(String path, Strin

@Then("The blocking file {string} should exist")
public void the_blocking_file_test_should_exist(String path) {
Metadata metadata = op.stat(path);
@Cleanup Metadata metadata = op.stat(path);
assertNotNull(metadata);
}

@Then("The blocking file {string} entry mode must be file")
public void the_blocking_file_test_entry_mode_must_be_file(String path) {
Metadata metadata = op.stat(path);
@Cleanup Metadata metadata = op.stat(path);
assertTrue(metadata.isFile());
}

@Then("The blocking file {string} content length must be {int}")
public void the_blocking_file_test_content_length_must_be_13(String path, int length) {
Metadata metadata = op.stat(path);
@Cleanup Metadata metadata = op.stat(path);
assertEquals(metadata.getContentLength(), length);
}

Expand Down

0 comments on commit 3e6efbd

Please sign in to comment.