Skip to content

Commit

Permalink
feat: Publishing JSON body.
Browse files Browse the repository at this point in the history
  • Loading branch information
nstdio committed Sep 30, 2022
1 parent 7a772aa commit 79a9799
Show file tree
Hide file tree
Showing 15 changed files with 462 additions and 30 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@ val brotli4JVersion = "1.8.0"
val brotliOrgVersion = "0.1.2"
val gsonVersion = "2.9.1"
val equalsverifierVersion = "3.10.1"
val coroutinesVersion = "1.6.4"

val jsonLibs = mapOf(
"jackson" to "com.fasterxml.jackson.core",
Expand All @@ -97,6 +98,8 @@ val spiDeps = listOf(
dependencies {
spiDeps.forEach { compileOnly(it) }

testImplementation(platform("org.jetbrains.kotlinx:kotlinx-coroutines-bom:$coroutinesVersion"))

/** AssertJ & Friends */
testImplementation("org.assertj:assertj-core:$assertJVersion")
testImplementation("io.kotest:kotest-assertions-core:$kotestAssertionsVersion")
Expand All @@ -116,6 +119,9 @@ dependencies {
testImplementation("nl.jqno.equalsverifier:equalsverifier:$equalsverifierVersion")
testImplementation("com.tngtech.archunit:archunit-junit5:1.0.0-rc1")

/** Kotlin Coroutines */
testImplementation("org.jetbrains.kotlinx:kotlinx-coroutines-core")

spiDeps.forEach { spiTestImplementation(it) }
spiTestImplementation("com.aayushatharva.brotli4j:native-${getArch()}:$brotli4JVersion")
}
Expand Down
131 changes: 131 additions & 0 deletions src/main/java/io/github/nstdio/http/ext/BodyPublishers.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,131 @@
/*
* Copyright (C) 2022 Edgar Asatryan
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.github.nstdio.http.ext;

import io.github.nstdio.http.ext.spi.JsonMappingProvider;

import java.io.ByteArrayOutputStream;
import java.net.http.HttpRequest.BodyPublisher;
import java.nio.ByteBuffer;
import java.util.Optional;
import java.util.concurrent.Executor;
import java.util.concurrent.Flow;
import java.util.concurrent.ForkJoinPool;
import java.util.function.Supplier;

/**
* Implementations of various useful {@link BodyPublisher}s.
*/
public final class BodyPublishers {
private BodyPublishers() {
}

/**
* Returns a request body publisher whose body is JSON representation of {@code body}. The conversion will be done
* using {@code JsonMappingProvider} default provider retrieved using {@link JsonMappingProvider#provider()}.
*
* @param body The body.
*
* @return a BodyPublisher
*/
public static BodyPublisher ofJson(Object body) {
return ofJson(body, JsonMappingProvider.provider());
}

/**
* Returns a request body publisher whose body is JSON representation of {@code body}. The conversion will be done
* using {@code jsonProvider}.
*
* @param body The body.
* @param jsonProvider The JSON mapping provider to use when creating JSON presentation of {@code body}.
*
* @return a BodyPublisher
*/
public static BodyPublisher ofJson(Object body, JsonMappingProvider jsonProvider) {
return ofJson(body, jsonProvider, null);
}

/**
* Returns a request body publisher whose body is JSON representation of {@code body}. The conversion will be done
* using {@code JsonMappingProvider} default provider retrieved using {@link JsonMappingProvider#provider()}.
*
* @param body The body.
* @param executor The scheduler to use to publish body to subscriber. If {@code null} the *
* {@link ForkJoinPool#commonPool()} will be used.
*
* @return a BodyPublisher
*/
public static BodyPublisher ofJson(Object body, Executor executor) {
return ofJson(body, JsonMappingProvider.provider(), executor);
}

/**
* Returns a request body publisher whose body is JSON representation of {@code body}. The conversion will be done *
* using {@code jsonProvider}.
*
* @param body The body.
* @param jsonProvider The JSON mapping provider to use when creating JSON presentation of {@code body}.
* @param executor The scheduler to use to publish body to subscriber. If {@code null} the
* {@link ForkJoinPool#commonPool()} will be used.
*
* @return a BodyPublisher
*/
public static BodyPublisher ofJson(Object body, JsonMappingProvider jsonProvider, Executor executor) {
return new JsonPublisher(body, jsonProvider, Optional.ofNullable(executor).orElseGet(ForkJoinPool::commonPool));
}

/**
* The {@code BodyPublisher} that converts objects to JSON.
*/
static final class JsonPublisher implements BodyPublisher {
private final Object body;
private final JsonMappingProvider provider;
private final Executor executor;

JsonPublisher(Object body, JsonMappingProvider provider, Executor executor) {
this.body = body;
this.provider = provider;
this.executor = executor;
}

@Override
public void subscribe(Flow.Subscriber<? super ByteBuffer> subscriber) {
var subscription = ByteArraySubscription.ofByteBuffer(subscriber, bytesSupplier(), executor);

subscriber.onSubscribe(subscription);
}

private Supplier<byte[]> bytesSupplier() {
return () -> {
var os = new ByteArrayOutputStream();
try {
provider.get().write(body, os);
} catch (Throwable e) {
Throwables.sneakyThrow(e);
}

return os.toByteArray();
};
}

@Override
public long contentLength() {
return -1;
}
}

}
76 changes: 59 additions & 17 deletions src/main/java/io/github/nstdio/http/ext/ByteArraySubscription.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,42 +18,84 @@

import java.nio.ByteBuffer;
import java.util.List;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Flow.Subscriber;
import java.util.concurrent.Flow.Subscription;
import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Function;
import java.util.function.Supplier;

class ByteArraySubscription<T> implements Subscription {
private final Subscriber<T> subscriber;
private final Executor executor;

private final Function<byte[], T> mapper;
private final Supplier<byte[]> bytes;

class ByteArraySubscription implements Subscription {
private final Subscriber<List<ByteBuffer>> subscriber;
private final AtomicBoolean completed = new AtomicBoolean(false);
private final byte[] bytes;
private Future<?> result;

ByteArraySubscription(Subscriber<List<ByteBuffer>> subscriber, byte[] bytes) {
ByteArraySubscription(Subscriber<T> subscriber, Executor executor, Supplier<byte[]> bytes, Function<byte[], T> mapper) {
this.subscriber = subscriber;
this.executor = executor;
this.bytes = bytes;
this.mapper = mapper;
}

static ByteArraySubscription<List<ByteBuffer>> ofByteBufferList(Subscriber<List<ByteBuffer>> subscriber, byte[] bytes) {
return new ByteArraySubscription<>(subscriber, DirectExecutor.INSTANCE, () -> bytes, o -> List.of(ByteBuffer.wrap(o).asReadOnlyBuffer()));
}

static ByteArraySubscription<? super ByteBuffer> ofByteBuffer(Subscriber<? super ByteBuffer> subscriber, Supplier<byte[]> bytes, Executor executor) {
return new ByteArraySubscription<>(subscriber, executor, bytes, ByteBuffer::wrap);
}

@Override
public void request(long n) {
if (completed.get()) {
return;
}
if (!completed.getAndSet(true)) {
if (n > 0) {
submit(() -> {
try {
T item = mapper.apply(bytes.get());

if (n <= 0) {
subscriber.onError(new IllegalArgumentException("n <= 0"));
return;
subscriber.onNext(item);
subscriber.onComplete();
} catch (Throwable th) {
subscriber.onError(th);
}
});
} else {
var e = new IllegalArgumentException("n <= 0");
submit(() -> subscriber.onError(e));
}
}
}

@Override
public void cancel() {
completed.set(true);

ByteBuffer buffer = ByteBuffer.wrap(bytes).asReadOnlyBuffer();
List<ByteBuffer> item = List.of(buffer);
if (result != null) {
result.cancel(false);
}
}

subscriber.onNext(item);
subscriber.onComplete();
private void submit(Runnable r) {
if (executor instanceof ExecutorService) {
result = ((ExecutorService) executor).submit(r);
} else {
executor.execute(r);
}
}

@Override
public void cancel() {
completed.set(true);
private enum DirectExecutor implements Executor {
INSTANCE;

@Override
public void execute(Runnable command) {
command.run();
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
/*
* Copyright (C) 2022 Edgar Asatryan
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.github.nstdio.http.ext;

import io.github.nstdio.http.ext.BodyPublishers.JsonPublisher;

import java.net.http.HttpRequest.BodyPublisher;
import java.util.Map;
import java.util.Optional;

import static io.github.nstdio.http.ext.Headers.HEADER_CONTENT_TYPE;
import static java.util.function.Predicate.not;

class ContentTypeInterceptor implements Interceptor {
private final Interceptor headersAdding;

ContentTypeInterceptor(String contentType) {
headersAdding = new HeadersAddingInterceptor(Map.of(HEADER_CONTENT_TYPE, contentType));
}

@Override
public <T> Chain<T> intercept(Chain<T> in) {
if (!isJsonPublisher(in.request().bodyPublisher())) {
return in;
}

return headersAdding.intercept(in);
}

private static boolean isJsonPublisher(Optional<BodyPublisher> bodyPublisher) {
return bodyPublisher.filter(not(JsonPublisher.class::isInstance)).isEmpty();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ public class ExtendedHttpClient extends HttpClient {
private final CompressionInterceptor compressionInterceptor;
private final CachingInterceptor cachingInterceptor;
private final HeadersAddingInterceptor headersAddingInterceptor;
private final ContentTypeInterceptor contentTypeInterceptor;

private final HttpClient delegate;
private final boolean allowInsecure;
Expand All @@ -68,6 +69,7 @@ private ExtendedHttpClient(CompressionInterceptor compressionInterceptor,
this.compressionInterceptor = compressionInterceptor;
this.cachingInterceptor = cachingInterceptor;
this.headersAddingInterceptor = headersAddingInterceptor;
this.contentTypeInterceptor = new ContentTypeInterceptor("application/json");
this.delegate = delegate;
this.allowInsecure = allowInsecure;
}
Expand Down Expand Up @@ -188,6 +190,7 @@ private <T> Chain<T> buildAndExecute(RequestContext ctx) {
chain = possiblyApply(compressionInterceptor, chain);
chain = possiblyApply(cachingInterceptor, chain);
chain = possiblyApply(headersAddingInterceptor, chain);
chain = possiblyApply(contentTypeInterceptor, chain);

return chain;
}
Expand Down
1 change: 1 addition & 0 deletions src/main/java/io/github/nstdio/http/ext/Headers.java
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ class Headers {
static final String HEADER_VARY = "Vary";
static final String HEADER_CONTENT_ENCODING = "Content-Encoding";
static final String HEADER_CONTENT_LENGTH = "Content-Length";
static final String HEADER_CONTENT_TYPE = "Content-Type";
static final String HEADER_IF_MODIFIED_SINCE = "If-Modified-Since";
static final String HEADER_IF_NONE_MATCH = "If-None-Match";
static final String HEADER_CACHE_CONTROL = "Cache-Control";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,10 @@ class HeadersAddingInterceptor implements Interceptor {
this.headers = headers;
this.resolvableHeaders = resolvableHeaders;
}

HeadersAddingInterceptor(Map<String, String> headers) {
this(headers, Map.of());
}

@Override
public <T> Chain<T> intercept(Chain<T> in) {
Expand Down
2 changes: 1 addition & 1 deletion src/main/java/io/github/nstdio/http/ext/InMemoryCache.java
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ public long bodySize() {

@Override
public void subscribeTo(Flow.Subscriber<List<ByteBuffer>> sub) {
Flow.Subscription subscription = new ByteArraySubscription(sub, body);
Flow.Subscription subscription = ByteArraySubscription.ofByteBufferList(sub, body);
sub.onSubscribe(subscription);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.io.OutputStream;
import java.io.OutputStreamWriter;
import java.lang.reflect.Type;
import java.util.Objects;

Expand Down Expand Up @@ -67,4 +69,11 @@ public <T> T read(byte[] bytes, Class<T> targetType) throws IOException {
public <T> T read(byte[] bytes, Type targetType) throws IOException {
return read(new ByteArrayInputStream(bytes), targetType);
}

@Override
public void write(Object o, OutputStream os) throws IOException {
try (var writer = new OutputStreamWriter(os, UTF_8)) {
gson.toJson(o, writer);
}
}
}
Loading

0 comments on commit 79a9799

Please sign in to comment.