Skip to content

Commit

Permalink
chore: Increase write performance by using bulk writes when possible.
Browse files Browse the repository at this point in the history
  • Loading branch information
nstdio committed Dec 25, 2022
1 parent f814247 commit 59a08f0
Show file tree
Hide file tree
Showing 4 changed files with 100 additions and 5 deletions.
13 changes: 11 additions & 2 deletions src/main/java/io/github/nstdio/http/ext/PathSubscriber.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import java.io.IOException;
import java.net.http.HttpResponse;
import java.nio.ByteBuffer;
import java.nio.channels.GatheringByteChannel;
import java.nio.channels.WritableByteChannel;
import java.nio.file.Path;
import java.util.List;
Expand Down Expand Up @@ -70,11 +71,19 @@ private synchronized void createChannel() {
@Override
public void onNext(List<ByteBuffer> item) {
try {
write(item);
} catch (IOException ex) {
onError(ex);
}
}

private void write(List<ByteBuffer> item) throws IOException {
if (out instanceof GatheringByteChannel) {
((GatheringByteChannel) out).write(item.toArray(ByteBuffer[]::new));
} else {
for (ByteBuffer buffer : item) {
out.write(buffer);
}
} catch (IOException ex) {
onError(ex);
}
}

Expand Down
19 changes: 19 additions & 0 deletions src/main/java/io/github/nstdio/http/ext/SimpleStreamFactory.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,16 +19,35 @@
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.nio.channels.WritableByteChannel;
import java.nio.file.Files;
import java.nio.file.OpenOption;
import java.nio.file.Path;
import java.nio.file.StandardOpenOption;

import static java.nio.file.StandardOpenOption.READ;

class SimpleStreamFactory implements StreamFactory {
private static void assertNotContains(OpenOption[] options, StandardOpenOption needle) {
for (OpenOption option : options) {
if (option == needle) {
throw new IllegalArgumentException(needle + " not allowed");
}
}
}

@Override
public OutputStream output(Path path, OpenOption... options) throws IOException {
return Files.newOutputStream(path, options);
}

@Override
public WritableByteChannel writable(Path path, OpenOption... options) throws IOException {
assertNotContains(options, READ);

return Files.newByteChannel(path, options);
}

@Override
public InputStream input(Path path, OpenOption... options) throws IOException {
return Files.newInputStream(path, options);
Expand Down
36 changes: 33 additions & 3 deletions src/test/kotlin/io/github/nstdio/http/ext/PathSubscriberTest.kt
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,17 @@
*/
package io.github.nstdio.http.ext

import io.kotest.matchers.booleans.shouldBeTrue
import io.kotest.matchers.future.shouldBeCompletedExceptionally
import org.junit.jupiter.api.Test
import org.mockito.BDDMockito.given
import org.mockito.BDDMockito.verify
import org.mockito.Mockito.any
import org.mockito.Mockito.mock
import java.io.IOException
import java.nio.ByteBuffer
import java.nio.channels.WritableByteChannel
import java.nio.file.Path
import java.util.concurrent.Flow

internal class PathSubscriberTest {
@Test
Expand All @@ -32,7 +39,7 @@ internal class PathSubscriberTest {
val body = subscriber.body.toCompletableFuture()

//then
body.isCompletedExceptionally.shouldBeTrue()
body.shouldBeCompletedExceptionally()
}

@Test
Expand All @@ -45,6 +52,29 @@ internal class PathSubscriberTest {
val body = subscriber.body.toCompletableFuture()

//then
body.isCompletedExceptionally.shouldBeTrue()
body.shouldBeCompletedExceptionally()
}

@Test
fun `Should complete exceptional when next throws`() {
//given
val mockStreamFactory = mock(StreamFactory::class.java)
val mockSub = mock(Flow.Subscription::class.java)
val mockChannel = mock(WritableByteChannel::class.java)
val subscriber = PathSubscriber(mockStreamFactory, Path.of("abc"))

given(mockStreamFactory.writable(any(), any()))
.willReturn(mockChannel)
given(mockChannel.write(any())).willThrow(IOException())

//when
subscriber.onSubscribe(mockSub)
subscriber.onNext(listOf(ByteBuffer.allocate(1)))

val body = subscriber.body.toCompletableFuture()

//then
body.shouldBeCompletedExceptionally()
verify(mockChannel).close()
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
/*
* Copyright (C) 2022 Edgar Asatryan
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.github.nstdio.http.ext

import io.kotest.assertions.throwables.shouldThrowExactly
import io.kotest.matchers.throwable.shouldHaveMessage
import org.junit.jupiter.api.Test
import java.nio.file.Path
import java.nio.file.StandardOpenOption.READ
import java.nio.file.StandardOpenOption.WRITE

class SimpleStreamFactoryTest {
@Test
fun `Should not allow read option on write method`() {
//given
val factory = SimpleStreamFactory()

//when + then
shouldThrowExactly<IllegalArgumentException> {
factory.writable(Path.of("any"), READ, WRITE)
}.shouldHaveMessage("READ not allowed")
}
}

0 comments on commit 59a08f0

Please sign in to comment.