Skip to content

Commit

Permalink
An SPI for interacting with local context storage.
Browse files Browse the repository at this point in the history
The existing implementation uses on a concurrent hash map guarded by the context instance.

This new implementation relies on prior knowledge of the set of context locals, so the context allocates upfront the required space to hold the local storage, reducing synchronisation and simplifying lookup that uses allocated index instead of hashed string lookups.

The existing local context data is retrofit as a context local.
  • Loading branch information
vietj committed Mar 1, 2024
1 parent 5693d74 commit 88cf77b
Show file tree
Hide file tree
Showing 21 changed files with 611 additions and 116 deletions.
15 changes: 15 additions & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -720,6 +720,21 @@
</additionalClasspathElements>
</configuration>
</execution>
<execution>
<id>custom-context-local</id>
<goals>
<goal>integration-test</goal>
<goal>verify</goal>
</goals>
<configuration>
<includes>
<include>io/vertx/it/CustomContextLocalTest.java</include>
</includes>
<additionalClasspathElements>
<additionalClasspathElement>${project.basedir}/src/test/classpath/customcontextlocal</additionalClasspathElement>
</additionalClasspathElements>
</configuration>
</execution>
</executions>
</plugin>

Expand Down
66 changes: 66 additions & 0 deletions src/main/java/io/vertx/core/impl/ContextBase.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
/*
* Copyright (c) 2011-2023 Contributors to the Eclipse Foundation
*
* This program and the accompanying materials are made available under the
* terms of the Eclipse Public License 2.0 which is available at
* http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0
* which is available at https://www.apache.org/licenses/LICENSE-2.0.
*
* SPDX-License-Identifier: EPL-2.0 OR Apache-2.0
*/
package io.vertx.core.impl;

import io.vertx.core.spi.context.storage.AccessMode;
import io.vertx.core.spi.context.storage.ContextLocal;

import java.util.concurrent.atomic.AtomicReferenceArray;
import java.util.function.Supplier;

/**
* Base class for context.
*
* @author <a href="mailto:[email protected]">Julien Viet</a>
*/
class ContextBase extends AtomicReferenceArray<Object> {

private final int localsLength;

ContextBase(int localsLength) {
super(localsLength);
this.localsLength = localsLength;
}

ContextBase(ContextBase another) {
super(another.localsLength);
this.localsLength = another.localsLength;
}

public final <T> T getLocal(ContextLocal<T> key, AccessMode accessMode) {
ContextLocalImpl<T> internalKey = (ContextLocalImpl<T>) key;
int index = internalKey.index;
if (index >= localsLength) {
throw new IllegalArgumentException();
}
Object res = accessMode.get(this, index);
return (T) res;
}

public final <T> T getLocal(ContextLocal<T> key, AccessMode accessMode, Supplier<? extends T> initialValueSupplier) {
ContextLocalImpl<T> internalKey = (ContextLocalImpl<T>) key;
int index = internalKey.index;
if (index >= localsLength) {
throw new IllegalArgumentException("Invalid key index: " + index);
}
Object res = accessMode.getOrCreate(this, index, (Supplier<Object>) initialValueSupplier);
return (T) res;
}

public final <T> void putLocal(ContextLocal<T> key, AccessMode accessMode, T value) {
ContextLocalImpl<T> internalKey = (ContextLocalImpl<T>) key;
int index = internalKey.index;
if (index >= localsLength) {
throw new IllegalArgumentException();
}
accessMode.put(this, index, value);
}
}
13 changes: 3 additions & 10 deletions src/main/java/io/vertx/core/impl/ContextImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@
* @author <a href="http://tfox.org">Tim Fox</a>
* @author <a href="mailto:[email protected]">Julien Viet</a>
*/
public final class ContextImpl implements ContextInternal {
public final class ContextImpl extends ContextBase implements ContextInternal {

static <T> void setResultHandler(ContextInternal ctx, Future<T> fut, Handler<AsyncResult<T>> resultHandler) {
if (resultHandler != null) {
Expand All @@ -52,14 +52,14 @@ static <T> void setResultHandler(ContextInternal ctx, Future<T> fut, Handler<Asy
private final EventLoop eventLoop;
private final EventExecutor executor;
private ConcurrentMap<Object, Object> data;
private ConcurrentMap<Object, Object> localData;
private volatile Handler<Throwable> exceptionHandler;
final TaskQueue internalOrderedTasks;
final WorkerPool internalWorkerPool;
final WorkerPool workerPool;
final TaskQueue orderedTasks;

public ContextImpl(VertxInternal vertx,
int localsLength,
ThreadingModel threadingModel,
EventLoop eventLoop,
EventExecutor executor,
Expand All @@ -69,6 +69,7 @@ public ContextImpl(VertxInternal vertx,
Deployment deployment,
CloseFuture closeFuture,
ClassLoader tccl) {
super(localsLength);
this.threadingModel = threadingModel;
this.deployment = deployment;
this.config = deployment != null ? deployment.config() : new JsonObject();
Expand Down Expand Up @@ -250,14 +251,6 @@ public synchronized ConcurrentMap<Object, Object> contextData() {
return data;
}

@Override
public synchronized ConcurrentMap<Object, Object> localContextData() {
if (localData == null) {
localData = new ConcurrentHashMap<>();
}
return localData;
}

public void reportException(Throwable t) {
Handler<Throwable> handler = exceptionHandler;
if (handler == null) {
Expand Down
71 changes: 66 additions & 5 deletions src/main/java/io/vertx/core/impl/ContextInternal.java
Original file line number Diff line number Diff line change
Expand Up @@ -13,16 +13,17 @@

import io.netty.channel.EventLoop;
import io.vertx.core.*;
import io.vertx.core.Future;
import io.vertx.core.impl.future.FailedFuture;
import io.vertx.core.impl.future.PromiseImpl;
import io.vertx.core.impl.future.PromiseInternal;
import io.vertx.core.impl.future.SucceededFuture;
import io.vertx.core.spi.context.storage.AccessMode;
import io.vertx.core.spi.context.storage.ContextLocal;
import io.vertx.core.spi.tracing.VertxTracer;

import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.*;
import java.util.function.Supplier;

import static io.vertx.core.impl.ContextImpl.setResultHandler;

Expand All @@ -35,6 +36,8 @@
*/
public interface ContextInternal extends Context {

ContextLocal<ConcurrentMap<Object, Object>> LOCAL_MAP = new ContextLocalImpl<>(0);

/**
* @return the current context
*/
Expand Down Expand Up @@ -346,19 +349,76 @@ default boolean remove(Object key) {
/**
* @return the {@link ConcurrentMap} used to store local context data
*/
ConcurrentMap<Object, Object> localContextData();
default ConcurrentMap<Object, Object> localContextData() {
return LOCAL_MAP.get(this, ConcurrentHashMap::new);
}

/**
* Get some local data from the context.
*
* @param key the key of the data
* @param <T> the type of the data
* @return the local data
*/
default <T> T getLocal(ContextLocal<T> key) {
return getLocal(key, AccessMode.CONCURRENT);
}

/**
* Get some local data from the context.
*
* @param key the key of the data
* @param <T> the type of the data
* @return the local data
*/
<T> T getLocal(ContextLocal<T> key, AccessMode accessMode);

/**
* Get some local data from the context, when it does not exist the {@code initialValueSupplier} is called to obtain
* the initial value.
*
* <p> The {@code initialValueSupplier} might be called multiple times when multiple threads call this method concurrently.
*
* @param key the key of the data
* @param initialValueSupplier the supplier of the initial value optionally called
* @param <T> the type of the data
* @return the local data
*/
<T> T getLocal(ContextLocal<T> key, AccessMode accessMode, Supplier<? extends T> initialValueSupplier);

/**
* Put some local data in the context.
* <p>
* This can be used to share data between different handlers that share a context
*
* @param key the key of the data
* @param value the data
*/
<T> void putLocal(ContextLocal<T> key, AccessMode accessMode, T value);

/**
* Remove some local data from the context.
*
* @param key the key to remove
*/
default <T> void removeLocal(ContextLocal<T> key, AccessMode accessMode) {
putLocal(key, accessMode, null);
}

@Deprecated
@SuppressWarnings("unchecked")
@Override
default <T> T getLocal(Object key) {
return (T) localContextData().get(key);
}

@Deprecated
@Override
default void putLocal(Object key, Object value) {
localContextData().put(key, value);
}

@Deprecated
@Override
default boolean removeLocal(Object key) {
return localContextData().remove(key) != null;
Expand Down Expand Up @@ -487,4 +547,5 @@ default ContextInternal unwrap() {
default boolean isDuplicate() {
return false;
}

}
29 changes: 29 additions & 0 deletions src/main/java/io/vertx/core/impl/ContextLocalImpl.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
/*
* Copyright (c) 2011-2023 Contributors to the Eclipse Foundation
*
* This program and the accompanying materials are made available under the
* terms of the Eclipse Public License 2.0 which is available at
* http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0
* which is available at https://www.apache.org/licenses/LICENSE-2.0.
*
* SPDX-License-Identifier: EPL-2.0 OR Apache-2.0
*/
package io.vertx.core.impl;

import io.vertx.core.spi.context.storage.ContextLocal;

/**
* @author <a href="mailto:[email protected]">Julien Viet</a>
*/
public class ContextLocalImpl<T> implements ContextLocal<T> {

final int index;

public ContextLocalImpl(int index) {
this.index = index;
}

public ContextLocalImpl() {
this.index = LocalSeq.next();
}
}
16 changes: 3 additions & 13 deletions src/main/java/io/vertx/core/impl/DuplicatedContext.java
Original file line number Diff line number Diff line change
Expand Up @@ -31,12 +31,12 @@
*
* @author <a href="mailto:[email protected]">Julien Viet</a>
*/
class DuplicatedContext implements ContextInternal {
final class DuplicatedContext extends ContextBase implements ContextInternal {

protected final ContextImpl delegate;
private ConcurrentMap<Object, Object> localData;
final ContextImpl delegate;

DuplicatedContext(ContextImpl delegate) {
super(delegate);
this.delegate = delegate;
}

Expand Down Expand Up @@ -116,16 +116,6 @@ public final ConcurrentMap<Object, Object> contextData() {
return delegate.contextData();
}

@Override
public final ConcurrentMap<Object, Object> localContextData() {
synchronized (this) {
if (localData == null) {
localData = new ConcurrentHashMap<>();
}
return localData;
}
}

@Override
public final <T> Future<T> executeBlockingInternal(Handler<Promise<T>> action) {
return ContextImpl.executeBlocking(this, action, delegate.internalWorkerPool, delegate.internalOrderedTasks);
Expand Down
37 changes: 37 additions & 0 deletions src/main/java/io/vertx/core/impl/LocalSeq.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
/*
* Copyright (c) 2011-2023 Contributors to the Eclipse Foundation
*
* This program and the accompanying materials are made available under the
* terms of the Eclipse Public License 2.0 which is available at
* http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0
* which is available at https://www.apache.org/licenses/LICENSE-2.0.
*
* SPDX-License-Identifier: EPL-2.0 OR Apache-2.0
*/
package io.vertx.core.impl;

import java.util.concurrent.atomic.AtomicInteger;

/**
* @author <a href="mailto:[email protected]">Julien Viet</a>
*/
class LocalSeq {

// 0 : reserved slot for local context map
private static final AtomicInteger seq = new AtomicInteger(1);

/**
* Hook for testing purposes
*/
static void reset() {
seq.set((1));
}

static int get() {
return seq.get();
}

static int next() {
return seq.getAndIncrement();
}
}
8 changes: 5 additions & 3 deletions src/main/java/io/vertx/core/impl/VertxImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,7 @@ public class VertxImpl implements VertxInternal, MetricsProvider {
private final FileResolver fileResolver;
private final Map<ServerID, HttpServerImpl> sharedHttpServers = new HashMap<>();
private final Map<ServerID, NetServerImpl> sharedNetServers = new HashMap<>();
private final int contextLocalsLength;
final WorkerPool workerPool;
final WorkerPool internalWorkerPool;
final WorkerPool virtualThreaWorkerPool;
Expand Down Expand Up @@ -192,6 +193,7 @@ public class VertxImpl implements VertxInternal, MetricsProvider {
ExecutorService internalWorkerExec = executorServiceFactory.createExecutor(internalWorkerThreadFactory, internalBlockingPoolSize, internalBlockingPoolSize);
PoolMetrics internalBlockingPoolMetrics = metrics != null ? metrics.createPoolMetrics("worker", "vert.x-internal-blocking", internalBlockingPoolSize) : null;

contextLocalsLength = LocalSeq.get();
closeFuture = new CloseFuture(log);
maxEventLoopExecTime = maxEventLoopExecuteTime;
maxEventLoopExecTimeUnit = maxEventLoopExecuteTimeUnit;
Expand Down Expand Up @@ -552,7 +554,7 @@ public boolean cancelTimer(long id) {
}

private ContextImpl createEventLoopContext(EventLoop eventLoop, CloseFuture closeFuture, WorkerPool workerPool, Deployment deployment, ClassLoader tccl) {
return new ContextImpl(this, ThreadingModel.EVENT_LOOP, eventLoop, new EventLoopExecutor(eventLoop), internalWorkerPool, workerPool != null ? workerPool : this.workerPool, new TaskQueue(), deployment, closeFuture, disableTCCL ? null : tccl);
return new ContextImpl(this, contextLocalsLength, ThreadingModel.EVENT_LOOP, eventLoop, new EventLoopExecutor(eventLoop), internalWorkerPool, workerPool != null ? workerPool : this.workerPool, new TaskQueue(), deployment, closeFuture, disableTCCL ? null : tccl);
}

@Override
Expand All @@ -573,7 +575,7 @@ public ContextImpl createEventLoopContext() {
private ContextImpl createWorkerContext(EventLoop eventLoop, CloseFuture closeFuture, WorkerPool workerPool, Deployment deployment, ClassLoader tccl) {
TaskQueue orderedTasks = new TaskQueue();
WorkerPool wp = workerPool != null ? workerPool : this.workerPool;
return new ContextImpl(this, ThreadingModel.WORKER, eventLoop, new WorkerExecutor(wp, orderedTasks), internalWorkerPool, wp, orderedTasks, deployment, closeFuture, disableTCCL ? null : tccl);
return new ContextImpl(this, contextLocalsLength, ThreadingModel.WORKER, eventLoop, new WorkerExecutor(wp, orderedTasks), internalWorkerPool, wp, orderedTasks, deployment, closeFuture, disableTCCL ? null : tccl);
}

@Override
Expand All @@ -596,7 +598,7 @@ private ContextImpl createVirtualThreadContext(EventLoop eventLoop, CloseFuture
throw new IllegalStateException("This Java runtime does not support virtual threads");
}
TaskQueue orderedTasks = new TaskQueue();
return new ContextImpl(this, ThreadingModel.VIRTUAL_THREAD, eventLoop, new WorkerExecutor(virtualThreaWorkerPool, orderedTasks), internalWorkerPool, virtualThreaWorkerPool, orderedTasks, deployment, closeFuture, disableTCCL ? null : tccl);
return new ContextImpl(this, contextLocalsLength, ThreadingModel.VIRTUAL_THREAD, eventLoop, new WorkerExecutor(virtualThreaWorkerPool, orderedTasks), internalWorkerPool, virtualThreaWorkerPool, orderedTasks, deployment, closeFuture, disableTCCL ? null : tccl);
}

@Override
Expand Down
Loading

0 comments on commit 88cf77b

Please sign in to comment.