key, AccessMode accessMode);
+
+ /**
+ * Get some local data from the context, when it does not exist the {@code initialValueSupplier} is called to obtain
+ * the initial value.
+ *
+ * The {@code initialValueSupplier} might be called multiple times when multiple threads call this method concurrently.
+ *
+ * @param key the key of the data
+ * @param initialValueSupplier the supplier of the initial value optionally called
+ * @param the type of the data
+ * @return the local data
+ */
+ T getLocal(ContextLocal key, AccessMode accessMode, Supplier extends T> initialValueSupplier);
+
+ /**
+ * Put some local data in the context.
+ *
+ * This can be used to share data between different handlers that share a context
+ *
+ * @param key the key of the data
+ * @param value the data
+ */
+ void putLocal(ContextLocal key, AccessMode accessMode, T value);
+ /**
+ * Remove some local data from the context.
+ *
+ * @param key the key to remove
+ */
+ default void removeLocal(ContextLocal key, AccessMode accessMode) {
+ putLocal(key, accessMode, null);
+ }
+
+ @Deprecated
@SuppressWarnings("unchecked")
@Override
default T getLocal(Object key) {
return (T) localContextData().get(key);
}
+ @Deprecated
@Override
default void putLocal(Object key, Object value) {
localContextData().put(key, value);
}
+ @Deprecated
@Override
default boolean removeLocal(Object key) {
return localContextData().remove(key) != null;
@@ -487,4 +547,5 @@ default ContextInternal unwrap() {
default boolean isDuplicate() {
return false;
}
+
}
diff --git a/src/main/java/io/vertx/core/impl/ContextLocalImpl.java b/src/main/java/io/vertx/core/impl/ContextLocalImpl.java
new file mode 100644
index 00000000000..02671c9a58c
--- /dev/null
+++ b/src/main/java/io/vertx/core/impl/ContextLocalImpl.java
@@ -0,0 +1,29 @@
+/*
+ * Copyright (c) 2011-2023 Contributors to the Eclipse Foundation
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License 2.0 which is available at
+ * http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0
+ * which is available at https://www.apache.org/licenses/LICENSE-2.0.
+ *
+ * SPDX-License-Identifier: EPL-2.0 OR Apache-2.0
+ */
+package io.vertx.core.impl;
+
+import io.vertx.core.spi.context.storage.ContextLocal;
+
+/**
+ * @author Julien Viet
+ */
+public class ContextLocalImpl implements ContextLocal {
+
+ final int index;
+
+ public ContextLocalImpl(int index) {
+ this.index = index;
+ }
+
+ public ContextLocalImpl() {
+ this.index = LocalSeq.next();
+ }
+}
diff --git a/src/main/java/io/vertx/core/impl/DuplicatedContext.java b/src/main/java/io/vertx/core/impl/DuplicatedContext.java
index 142fd77ebf7..aa5d7401d04 100644
--- a/src/main/java/io/vertx/core/impl/DuplicatedContext.java
+++ b/src/main/java/io/vertx/core/impl/DuplicatedContext.java
@@ -31,12 +31,12 @@
*
* @author Julien Viet
*/
-class DuplicatedContext implements ContextInternal {
+final class DuplicatedContext extends ContextBase implements ContextInternal {
- protected final ContextImpl delegate;
- private ConcurrentMap localData;
+ final ContextImpl delegate;
DuplicatedContext(ContextImpl delegate) {
+ super(delegate);
this.delegate = delegate;
}
@@ -116,16 +116,6 @@ public final ConcurrentMap contextData() {
return delegate.contextData();
}
- @Override
- public final ConcurrentMap localContextData() {
- synchronized (this) {
- if (localData == null) {
- localData = new ConcurrentHashMap<>();
- }
- return localData;
- }
- }
-
@Override
public final Future executeBlockingInternal(Handler> action) {
return ContextImpl.executeBlocking(this, action, delegate.internalWorkerPool, delegate.internalOrderedTasks);
diff --git a/src/main/java/io/vertx/core/impl/LocalSeq.java b/src/main/java/io/vertx/core/impl/LocalSeq.java
new file mode 100644
index 00000000000..b9b4a7f0d8a
--- /dev/null
+++ b/src/main/java/io/vertx/core/impl/LocalSeq.java
@@ -0,0 +1,37 @@
+/*
+ * Copyright (c) 2011-2023 Contributors to the Eclipse Foundation
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License 2.0 which is available at
+ * http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0
+ * which is available at https://www.apache.org/licenses/LICENSE-2.0.
+ *
+ * SPDX-License-Identifier: EPL-2.0 OR Apache-2.0
+ */
+package io.vertx.core.impl;
+
+import java.util.concurrent.atomic.AtomicInteger;
+
+/**
+ * @author Julien Viet
+ */
+class LocalSeq {
+
+ // 0 : reserved slot for local context map
+ private static final AtomicInteger seq = new AtomicInteger(1);
+
+ /**
+ * Hook for testing purposes
+ */
+ static void reset() {
+ seq.set((1));
+ }
+
+ static int get() {
+ return seq.get();
+ }
+
+ static int next() {
+ return seq.getAndIncrement();
+ }
+}
diff --git a/src/main/java/io/vertx/core/impl/VertxImpl.java b/src/main/java/io/vertx/core/impl/VertxImpl.java
index 7f8ae788af2..775ccd6f286 100644
--- a/src/main/java/io/vertx/core/impl/VertxImpl.java
+++ b/src/main/java/io/vertx/core/impl/VertxImpl.java
@@ -138,6 +138,7 @@ public class VertxImpl implements VertxInternal, MetricsProvider {
private final FileResolver fileResolver;
private final Map sharedHttpServers = new HashMap<>();
private final Map sharedNetServers = new HashMap<>();
+ private final int contextLocalsLength;
final WorkerPool workerPool;
final WorkerPool internalWorkerPool;
final WorkerPool virtualThreaWorkerPool;
@@ -192,6 +193,7 @@ public class VertxImpl implements VertxInternal, MetricsProvider {
ExecutorService internalWorkerExec = executorServiceFactory.createExecutor(internalWorkerThreadFactory, internalBlockingPoolSize, internalBlockingPoolSize);
PoolMetrics internalBlockingPoolMetrics = metrics != null ? metrics.createPoolMetrics("worker", "vert.x-internal-blocking", internalBlockingPoolSize) : null;
+ contextLocalsLength = LocalSeq.get();
closeFuture = new CloseFuture(log);
maxEventLoopExecTime = maxEventLoopExecuteTime;
maxEventLoopExecTimeUnit = maxEventLoopExecuteTimeUnit;
@@ -552,7 +554,7 @@ public boolean cancelTimer(long id) {
}
private ContextImpl createEventLoopContext(EventLoop eventLoop, CloseFuture closeFuture, WorkerPool workerPool, Deployment deployment, ClassLoader tccl) {
- return new ContextImpl(this, ThreadingModel.EVENT_LOOP, eventLoop, new EventLoopExecutor(eventLoop), internalWorkerPool, workerPool != null ? workerPool : this.workerPool, new TaskQueue(), deployment, closeFuture, disableTCCL ? null : tccl);
+ return new ContextImpl(this, contextLocalsLength, ThreadingModel.EVENT_LOOP, eventLoop, new EventLoopExecutor(eventLoop), internalWorkerPool, workerPool != null ? workerPool : this.workerPool, new TaskQueue(), deployment, closeFuture, disableTCCL ? null : tccl);
}
@Override
@@ -573,7 +575,7 @@ public ContextImpl createEventLoopContext() {
private ContextImpl createWorkerContext(EventLoop eventLoop, CloseFuture closeFuture, WorkerPool workerPool, Deployment deployment, ClassLoader tccl) {
TaskQueue orderedTasks = new TaskQueue();
WorkerPool wp = workerPool != null ? workerPool : this.workerPool;
- return new ContextImpl(this, ThreadingModel.WORKER, eventLoop, new WorkerExecutor(wp, orderedTasks), internalWorkerPool, wp, orderedTasks, deployment, closeFuture, disableTCCL ? null : tccl);
+ return new ContextImpl(this, contextLocalsLength, ThreadingModel.WORKER, eventLoop, new WorkerExecutor(wp, orderedTasks), internalWorkerPool, wp, orderedTasks, deployment, closeFuture, disableTCCL ? null : tccl);
}
@Override
@@ -596,7 +598,7 @@ private ContextImpl createVirtualThreadContext(EventLoop eventLoop, CloseFuture
throw new IllegalStateException("This Java runtime does not support virtual threads");
}
TaskQueue orderedTasks = new TaskQueue();
- return new ContextImpl(this, ThreadingModel.VIRTUAL_THREAD, eventLoop, new WorkerExecutor(virtualThreaWorkerPool, orderedTasks), internalWorkerPool, virtualThreaWorkerPool, orderedTasks, deployment, closeFuture, disableTCCL ? null : tccl);
+ return new ContextImpl(this, contextLocalsLength, ThreadingModel.VIRTUAL_THREAD, eventLoop, new WorkerExecutor(virtualThreaWorkerPool, orderedTasks), internalWorkerPool, virtualThreaWorkerPool, orderedTasks, deployment, closeFuture, disableTCCL ? null : tccl);
}
@Override
diff --git a/src/main/java/io/vertx/core/spi/context/storage/AccessMode.java b/src/main/java/io/vertx/core/spi/context/storage/AccessMode.java
new file mode 100644
index 00000000000..23db44dcee8
--- /dev/null
+++ b/src/main/java/io/vertx/core/spi/context/storage/AccessMode.java
@@ -0,0 +1,83 @@
+/*
+ * Copyright (c) 2011-2024 Contributors to the Eclipse Foundation
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License 2.0 which is available at
+ * http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0
+ * which is available at https://www.apache.org/licenses/LICENSE-2.0.
+ *
+ * SPDX-License-Identifier: EPL-2.0 OR Apache-2.0
+ */
+package io.vertx.core.spi.context.storage;
+
+import java.util.concurrent.atomic.AtomicReferenceArray;
+import java.util.function.Supplier;
+
+/**
+ * Defines the access mode of a context local storage.
+ */
+public interface AccessMode {
+
+ /**
+ * This access mode provides concurrent access to context local storage with thread safety and atomicity.
+ */
+ AccessMode CONCURRENT = new AccessMode() {
+
+ @Override
+ public Object get(AtomicReferenceArray locals, int idx) {
+ return locals.get(idx);
+ }
+
+ @Override
+ public void put(AtomicReferenceArray locals, int idx, Object value) {
+ locals.set(idx, value);
+ }
+
+ @Override
+ public Object getOrCreate(AtomicReferenceArray locals, int idx, Supplier initialValueSupplier) {
+ Object res;
+ while (true) {
+ res = locals.get(idx);
+ if (res != null) {
+ break;
+ }
+ Object initial = initialValueSupplier.get();
+ if (initial == null) {
+ throw new IllegalStateException();
+ }
+ if (locals.compareAndSet(idx, null, initial)) {
+ res = initial;
+ break;
+ }
+ }
+ return res;
+ }
+ };
+
+ /**
+ * Return the object at index {@code idx} in the {@code locals} array.
+ * @param locals the array
+ * @param idx the index
+ * @return the object at {@code index}
+ */
+ Object get(AtomicReferenceArray locals, int idx);
+
+ /**
+ * Put {@code value} in the {@code locals} array at index {@code idx}
+ * @param locals the array
+ * @param idx the index
+ * @param value the value
+ */
+ void put(AtomicReferenceArray locals, int idx, Object value);
+
+ /**
+ * Get or create the object at index {@code index} in the {@code locals} array. When the object
+ * does not exist, {@code initialValueSupplier} must be called to obtain this value.
+ *
+ * @param locals the array
+ * @param idx the index
+ * @param initialValueSupplier the supplier of the initial value
+ */
+ Object getOrCreate(AtomicReferenceArray locals, int idx, Supplier initialValueSupplier);
+
+}
diff --git a/src/main/java/io/vertx/core/spi/context/storage/ContextLocal.java b/src/main/java/io/vertx/core/spi/context/storage/ContextLocal.java
new file mode 100644
index 00000000000..8831b74098f
--- /dev/null
+++ b/src/main/java/io/vertx/core/spi/context/storage/ContextLocal.java
@@ -0,0 +1,105 @@
+/*
+ * Copyright (c) 2011-2023 Contributors to the Eclipse Foundation
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License 2.0 which is available at
+ * http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0
+ * which is available at https://www.apache.org/licenses/LICENSE-2.0.
+ *
+ * SPDX-License-Identifier: EPL-2.0 OR Apache-2.0
+ */
+package io.vertx.core.spi.context.storage;
+
+import io.vertx.core.Context;
+import io.vertx.core.impl.ContextInternal;
+import io.vertx.core.impl.ContextLocalImpl;
+
+import java.util.function.Supplier;
+
+/**
+ * A local storage for arbitrary data attached to a duplicated {@link Context}.
+ *
+ * Local storage should be registered before creating a {@link io.vertx.core.Vertx} instance, once registered a
+ * local storage cannot be unregistered.
+ *
+ *
It is recommended to initialize local storage as static fields of a {@link io.vertx.core.spi.VertxServiceProvider},
+ * since providers are discovered before the capture of known local storages.
+ *
+ * @author Julien Viet
+ */
+public interface ContextLocal {
+
+ /**
+ * Registers a context local storage.
+ *
+ * @return the context local storage
+ */
+ static ContextLocal registerLocal(Class type) {
+ return new ContextLocalImpl<>();
+ }
+
+ /**
+ * Get the local data from the {@code context}.
+ *
+ * @return the local data
+ */
+ default T get(Context context) {
+ return get(context, AccessMode.CONCURRENT);
+ }
+
+ /**
+ * Get the local data from the {@code context}, when it does not exist then call {@code initialValueSupplier} to obtain
+ * the initial value. The supplier can be called multiple times when several threads call this method concurrently.
+ *
+ * @param initialValueSupplier the supplier of the initial value
+ * @return the local data
+ */
+ default T get(Context context, Supplier extends T> initialValueSupplier) {
+ return get(context, AccessMode.CONCURRENT, initialValueSupplier);
+ }
+
+ /**
+ * Put local data in the {@code context}.
+ *
+ * @param data the data
+ */
+ default void put(Context context, T data) {
+ put(context, AccessMode.CONCURRENT, data);
+ }
+
+ /**
+ * Remove the local data from the context.
+ */
+ default void remove(Context context) {
+ put(context, AccessMode.CONCURRENT, null);
+ }
+
+ /**
+ * Like {@link #get(Context)} but with an {@code accessMode}.
+ */
+ default T get(Context context, AccessMode accessMode) {
+ return ((ContextInternal)context).getLocal(this, accessMode);
+ }
+
+ /**
+ * Like {@link #get(Context, Supplier)} but with an {@code accessMode}.
+ */
+ default T get(Context context, AccessMode accessMode, Supplier extends T> initialValueSupplier) {
+ return ((ContextInternal)context).getLocal(this, accessMode, initialValueSupplier);
+ }
+
+ /**
+ * Like {@link #put(Context, T)} but with an {@code accessMode}.
+ */
+ default void put(Context context, AccessMode accessMode, T value) {
+ ((ContextInternal)context).putLocal(this, accessMode, value);
+ }
+
+ /**
+ * Like {@link #remove(Context)} but with an {@code accessMode}.
+ */
+ default void remove(Context context, AccessMode accessMode) {
+ put(context, accessMode, null);
+ }
+
+}
diff --git a/src/test/benchmarks/io/vertx/core/impl/BenchmarkContext.java b/src/test/benchmarks/io/vertx/core/impl/BenchmarkContext.java
index c0076626eb1..6f5ed880402 100644
--- a/src/test/benchmarks/io/vertx/core/impl/BenchmarkContext.java
+++ b/src/test/benchmarks/io/vertx/core/impl/BenchmarkContext.java
@@ -34,6 +34,7 @@ public static ContextInternal create(Vertx vertx) {
VertxImpl impl = (VertxImpl) vertx;
return new ContextImpl(
impl,
+ 0,
ThreadingModel.WORKER,
impl.getEventLoopGroup().next(),
EXECUTOR,
diff --git a/src/test/classpath/customcontextlocal/META-INF/services/io.vertx.core.spi.VertxServiceProvider b/src/test/classpath/customcontextlocal/META-INF/services/io.vertx.core.spi.VertxServiceProvider
new file mode 100644
index 00000000000..f9b60b3d0a8
--- /dev/null
+++ b/src/test/classpath/customcontextlocal/META-INF/services/io.vertx.core.spi.VertxServiceProvider
@@ -0,0 +1 @@
+io.vertx.it.CustomContextLocal
diff --git a/src/test/java/io/vertx/core/ContextTest.java b/src/test/java/io/vertx/core/ContextTest.java
index 0e51ba579f8..ff58623783c 100644
--- a/src/test/java/io/vertx/core/ContextTest.java
+++ b/src/test/java/io/vertx/core/ContextTest.java
@@ -14,6 +14,8 @@
import io.netty.channel.EventLoop;
import io.vertx.core.impl.*;
import io.vertx.core.impl.future.PromiseInternal;
+import io.vertx.core.spi.context.storage.AccessMode;
+import io.vertx.core.spi.context.storage.ContextLocal;
import io.vertx.test.core.VertxTestBase;
import org.junit.Assume;
import org.junit.Test;
@@ -38,6 +40,7 @@
*/
public class ContextTest extends VertxTestBase {
+ private ContextLocal contextLocal;
private ExecutorService workerExecutor;
private ContextInternal createWorkerContext() {
@@ -46,12 +49,14 @@ private ContextInternal createWorkerContext() {
@Override
public void setUp() throws Exception {
+ contextLocal = ContextLocal.registerLocal(Object.class);
workerExecutor = Executors.newFixedThreadPool(2, r -> new VertxThread(r, "vert.x-worker-thread", true, 10, TimeUnit.SECONDS));
super.setUp();
}
@Override
protected void tearDown() throws Exception {
+ ContextLocalHelper.reset();
workerExecutor.shutdown();
super.tearDown();
}
@@ -478,9 +483,9 @@ private void checkDuplicate(ContextInternal ctx, ContextInternal duplicated) thr
Object shared = new Object();
Object local = new Object();
ctx.put("key", shared);
- ctx.putLocal("key", local);
+ contextLocal.put(ctx, local);
assertSame(shared, duplicated.get("key"));
- assertNull(duplicated.getLocal("key"));
+ assertNull(duplicated.getLocal(contextLocal));
assertTrue(duplicated.remove("key"));
assertNull(ctx.get("key"));
@@ -1083,4 +1088,37 @@ public void start() {
}, new DeploymentOptions().setThreadingModel(ThreadingModel.VIRTUAL_THREAD));
await();
}
+
+ @Test
+ public void testConcurrentLocalAccess() throws Exception {
+ ContextInternal ctx = (ContextInternal) vertx.getOrCreateContext();
+ int numThreads = 10;
+ Thread[] threads = new Thread[numThreads];
+ int[] values = new int[numThreads];
+ CyclicBarrier barrier = new CyclicBarrier(numThreads);
+ for (int i = 0;i < numThreads;i++) {
+ values[i] = -1;
+ int val = i;
+ Supplier supplier = () -> val;
+ threads[i] = new Thread(() -> {
+ try {
+ barrier.await();
+ } catch (Exception e) {
+ return;
+ }
+ values[val] = (int)ctx.getLocal(contextLocal, AccessMode.CONCURRENT, supplier);
+ });
+ }
+ for (int i = 0;i < numThreads;i++) {
+ threads[i].start();
+ }
+ for (int i = 0;i < numThreads;i++) {
+ threads[i].join();
+ }
+ assertTrue(values[0] >= 0);
+ for (int i = 0;i < numThreads;i++) {
+ assertEquals(values[i], values[0]);
+ }
+ }
+
}
diff --git a/src/test/java/io/vertx/core/FakeContext.java b/src/test/java/io/vertx/core/FakeContext.java
index 620f4818832..56a1d377857 100644
--- a/src/test/java/io/vertx/core/FakeContext.java
+++ b/src/test/java/io/vertx/core/FakeContext.java
@@ -10,11 +10,14 @@
import io.vertx.core.impl.VertxInternal;
import io.vertx.core.impl.WorkerPool;
import io.vertx.core.json.JsonObject;
+import io.vertx.core.spi.context.storage.AccessMode;
+import io.vertx.core.spi.context.storage.ContextLocal;
import io.vertx.core.spi.tracing.VertxTracer;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.Executor;
+import java.util.function.Supplier;
class FakeContext implements ContextInternal {
@@ -135,22 +138,18 @@ public VertxInternal owner() {
@Override
public void emit(T argument, Handler task) {
-
}
@Override
public void execute(Runnable task) {
-
}
@Override
public void execute(T argument, Handler task) {
-
}
@Override
public void reportException(Throwable t) {
-
}
@Override
@@ -158,11 +157,6 @@ public ConcurrentMap contextData() {
return null;
}
- @Override
- public ConcurrentMap localContextData() {
- return null;
- }
-
@Override
public ClassLoader classLoader() {
return tccl;
@@ -192,4 +186,19 @@ public boolean isDeployment() {
public CloseFuture closeFuture() {
return null;
}
+
+ @Override
+ public T getLocal(ContextLocal key, AccessMode accessMode) {
+ return null;
+ }
+
+ @Override
+ public T getLocal(ContextLocal key, AccessMode accessMode, Supplier extends T> initialValueSupplier) {
+ return null;
+ }
+
+ @Override
+ public void putLocal(ContextLocal key, AccessMode accessMode, T value) {
+
+ }
}
diff --git a/src/test/java/io/vertx/core/impl/ContextLocalHelper.java b/src/test/java/io/vertx/core/impl/ContextLocalHelper.java
new file mode 100644
index 00000000000..ef10e8de551
--- /dev/null
+++ b/src/test/java/io/vertx/core/impl/ContextLocalHelper.java
@@ -0,0 +1,25 @@
+/*
+ * Copyright (c) 2011-2023 Contributors to the Eclipse Foundation
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License 2.0 which is available at
+ * http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0
+ * which is available at https://www.apache.org/licenses/LICENSE-2.0.
+ *
+ * SPDX-License-Identifier: EPL-2.0 OR Apache-2.0
+ */
+package io.vertx.core.impl;
+
+/**
+ * @author Julien Viet
+ */
+public class ContextLocalHelper {
+
+ /**
+ * Reset the context locals, only available for testing purpose.
+ */
+ public static void reset() {
+ LocalSeq.reset();
+ }
+
+}
diff --git a/src/test/java/io/vertx/core/spi/tracing/EventBusTracerTestBase.java b/src/test/java/io/vertx/core/spi/tracing/EventBusTracerTestBase.java
index fbcc3fb9611..e821e6efa6d 100644
--- a/src/test/java/io/vertx/core/spi/tracing/EventBusTracerTestBase.java
+++ b/src/test/java/io/vertx/core/spi/tracing/EventBusTracerTestBase.java
@@ -15,24 +15,38 @@
import io.vertx.core.eventbus.DeliveryOptions;
import io.vertx.core.eventbus.Message;
import io.vertx.core.eventbus.ReplyException;
-import io.vertx.core.impl.ContextInternal;
+import io.vertx.core.impl.ContextLocalHelper;
+import io.vertx.core.spi.context.storage.ContextLocal;
import io.vertx.core.tracing.TracingPolicy;
-import io.vertx.test.core.TestUtils;
import io.vertx.test.core.VertxTestBase;
import org.junit.Test;
import java.util.*;
-import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.CountDownLatch;
import java.util.function.BiConsumer;
public abstract class EventBusTracerTestBase extends VertxTestBase {
+ ContextLocal receiveKey;
+ ContextLocal sendKey;
VertxTracer tracer;
Vertx vertx1;
Vertx vertx2;
+ @Override
+ public void setUp() throws Exception {
+ receiveKey = ContextLocal.registerLocal(Object.class);
+ sendKey = ContextLocal.registerLocal(Object.class);
+ super.setUp();
+ }
+
+ @Override
+ protected void tearDown() throws Exception {
+ ContextLocalHelper.reset();
+ super.tearDown();
+ }
+
@Override
protected VertxTracer getTracer() {
return tracer = new VertxTracer() {
@@ -56,11 +70,8 @@ public void receiveResponse(Context context, Object response, Object payload, Th
}
class EventBusTracer implements VertxTracer {
-
- final String receiveKey = TestUtils.randomAlphaString(10);
final Object receiveVal = new Object();
final Object receiveTrace = new Object();
- final String sendKey = TestUtils.randomAlphaString(10);
final Object sendVal = new Object();
final Object sendTrace = new Object();
final List sendEvents = new CopyOnWriteArrayList<>();
@@ -82,7 +93,7 @@ private String addressOf(T obj, TagExtractor extractor) {
@Override
public Object receiveRequest(Context context, SpanKind kind, TracingPolicy policy, R request, String operation, Iterable> headers, TagExtractor tagExtractor) {
- context.putLocal(receiveKey, receiveVal);
+ receiveKey.put(context, receiveVal);
Object body = ((Message)request).body();
receiveEvents.add("receiveRequest[" + addressOf(request, tagExtractor) + "]");
return receiveTrace;
@@ -91,14 +102,13 @@ public Object receiveRequest(Context context, SpanKind kind, TracingPolicy p
@Override
public void sendResponse(Context context, R response, Object payload, Throwable failure, TagExtractor tagExtractor) {
assertSame(receiveTrace, payload);
- assertSame(receiveVal, context.getLocal(receiveKey));
- assertTrue(context.removeLocal(receiveKey));
+ assertSame(receiveVal, receiveKey.get(context));
receiveEvents.add("sendResponse[]");
}
@Override
public Object sendRequest(Context context, SpanKind kind, TracingPolicy policy, R request, String operation, BiConsumer headers, TagExtractor tagExtractor) {
- assertSame(sendVal, context.getLocal(sendKey));
+ assertSame(sendVal, sendKey.get(context));
sendEvents.add("sendRequest[" + addressOf(request, tagExtractor) + "]");
assertTrue(request instanceof Message>);
return sendTrace;
@@ -107,8 +117,7 @@ public Object sendRequest(Context context, SpanKind kind, TracingPolicy poli
@Override
public void receiveResponse(Context context, R response, Object payload, Throwable failure, TagExtractor tagExtractor) {
assertSame(sendTrace, payload);
- assertSame(sendVal, context.getLocal(sendKey));
- assertTrue(context.removeLocal(sendKey));
+ assertSame(sendVal, sendKey.get(context));
if (failure != null) {
assertTrue(failure instanceof ReplyException);
ReplyException replyException = (ReplyException) failure;
@@ -138,8 +147,7 @@ public void testEventBusSend() throws Exception {
awaitLatch(latch);
vertx1.runOnContext(v -> {
Context ctx = vertx1.getOrCreateContext();
- ConcurrentMap tracerMap = ((ContextInternal) ctx).localContextData();
- tracerMap.put(ebTracer.sendKey, ebTracer.sendVal);
+ sendKey.put(ctx, ebTracer.sendVal);
vertx1.eventBus().send("the_address", "msg");
});
waitUntil(() -> ebTracer.sendEvents.size() + ebTracer.receiveEvents.size() == 4);
@@ -153,8 +161,7 @@ public void testEventBusSendNoConsumer() {
tracer = ebTracer;
Context ctx = vertx1.getOrCreateContext();
ctx.runOnContext(v -> {
- ConcurrentMap tracerMap = ((ContextInternal) ctx).localContextData();
- tracerMap.put(ebTracer.sendKey, ebTracer.sendVal);
+ sendKey.put(ctx, ebTracer.sendVal);
vertx1.eventBus().send("the_address", "msg");
});
waitUntil(() -> ebTracer.sendEvents.size() + ebTracer.receiveEvents.size() == 2);
@@ -173,8 +180,7 @@ public void testEventBusRequestReply() throws Exception {
assertNotSame(ctx, vertx2.getOrCreateContext());
assertSameEventLoop(ctx, vertx2.getOrCreateContext());
assertEquals("msg_1", msg.body());
- ConcurrentMap tracerMap = ((ContextInternal) vertx.getOrCreateContext()).localContextData();
- tracerMap.put(ebTracer.sendKey, ebTracer.sendVal);
+ sendKey.put(vertx.getOrCreateContext(), ebTracer.sendVal);
msg.reply("msg_2");
}).completionHandler(onSuccess(v2 -> {
latch.countDown();
@@ -183,8 +189,7 @@ public void testEventBusRequestReply() throws Exception {
awaitLatch(latch);
vertx1.runOnContext(v -> {
Context ctx = vertx1.getOrCreateContext();
- ConcurrentMap tracerMap = ((ContextInternal) ctx).localContextData();
- tracerMap.put(ebTracer.sendKey, ebTracer.sendVal);
+ sendKey.put(ctx, ebTracer.sendVal);
vertx1.eventBus().request("the_address", "msg_1", onSuccess(reply -> {
assertSame(ctx, vertx1.getOrCreateContext());
assertSameEventLoop(ctx, vertx1.getOrCreateContext());
@@ -202,8 +207,7 @@ public void testEventBusRequestReplyFailure() throws Exception {
CountDownLatch latch = new CountDownLatch(1);
vertx1.eventBus().consumer("the_address", msg -> {
assertEquals("msg", msg.body());
- ConcurrentMap tracerMap = ((ContextInternal) vertx.getOrCreateContext()).localContextData();
- tracerMap.put(ebTracer.sendKey, ebTracer.sendVal);
+ sendKey.put(vertx.getOrCreateContext(), ebTracer.sendVal);
msg.fail(10, "it failed");
}).completionHandler(onSuccess(v -> {
latch.countDown();
@@ -211,9 +215,8 @@ public void testEventBusRequestReplyFailure() throws Exception {
awaitLatch(latch);
Context ctx = vertx2.getOrCreateContext();
ctx.runOnContext(v1 -> {
- ConcurrentMap tracerMap = ((ContextInternal) ctx).localContextData();
- tracerMap.put(ebTracer.sendKey, ebTracer.sendVal);
- vertx2.eventBus().request("the_address", "msg", onFailure(failure -> {
+ sendKey.put(ctx, ebTracer.sendVal);
+ vertx2.eventBus().request("the_address", "msg").onComplete(onFailure(failure -> {
}));
});
waitUntil(() -> ebTracer.sendEvents.size() + ebTracer.receiveEvents.size() == 4);
@@ -227,9 +230,8 @@ public void testEventBusRequestNoConsumer() {
tracer = ebTracer;
Context ctx = vertx2.getOrCreateContext();
ctx.runOnContext(v -> {
- ConcurrentMap tracerMap = ((ContextInternal) ctx).localContextData();
- tracerMap.put(ebTracer.sendKey, ebTracer.sendVal);
- vertx2.eventBus().request("the_address", "msg", onFailure(failure -> { }));
+ sendKey.put(ctx, ebTracer.sendVal);
+ vertx2.eventBus().request("the_address", "msg").onComplete(onFailure(failure -> { }));
});
waitUntil(() -> ebTracer.sendEvents.size() + ebTracer.receiveEvents.size() == 2);
assertEquals(Arrays.asList("sendRequest[the_address]", "receiveResponse[NO_HANDLERS]"), ebTracer.sendEvents);
@@ -249,9 +251,8 @@ public void testEventBusRequestTimeout() throws Exception {
awaitLatch(latch);
Context ctx = vertx2.getOrCreateContext();
ctx.runOnContext(v1 -> {
- ConcurrentMap tracerMap = ((ContextInternal) ctx).localContextData();
- tracerMap.put(ebTracer.sendKey, ebTracer.sendVal);
- vertx2.eventBus().request("the_address", "msg", new DeliveryOptions().setSendTimeout(100), onFailure(failure -> {
+ sendKey.put(ctx, ebTracer.sendVal);
+ vertx2.eventBus().request("the_address", "msg", new DeliveryOptions().setSendTimeout(100)).onComplete(onFailure(failure -> {
}));
});
waitUntil(() -> ebTracer.sendEvents.size() + ebTracer.receiveEvents.size() == 3);
@@ -271,9 +272,8 @@ public void testEventBusRequestReplyReply() throws Exception {
assertNotSame(ctx, consumerCtx);
assertSameEventLoop(ctx, consumerCtx);
assertEquals("msg_1", msg.body());
- ConcurrentMap tracerMap = ((ContextInternal) vertx.getOrCreateContext()).localContextData();
- tracerMap.put(ebTracer.sendKey, ebTracer.sendVal);
- msg.replyAndRequest("msg_2", reply -> {
+ sendKey.put(vertx.getOrCreateContext(), ebTracer.sendVal);
+ msg.replyAndRequest("msg_2").onComplete(reply -> {
assertSame(consumerCtx, vertx2.getOrCreateContext());
assertSameEventLoop(consumerCtx, vertx2.getOrCreateContext());
});
@@ -284,11 +284,10 @@ public void testEventBusRequestReplyReply() throws Exception {
awaitLatch(latch);
vertx1.runOnContext(v -> {
Context ctx = vertx1.getOrCreateContext();
- ConcurrentMap tracerMap = ((ContextInternal) ctx).localContextData();
- tracerMap.put(ebTracer.sendKey, ebTracer.sendVal);
- vertx1.eventBus().request("the_address", "msg_1", onSuccess(reply -> {
+ sendKey.put(ctx, ebTracer.sendVal);
+ vertx1.eventBus().request("the_address", "msg_1").onComplete(onSuccess(reply -> {
assertSame(Vertx.currentContext(), ctx);
- tracerMap.put(ebTracer.sendKey, ebTracer.sendVal);
+ sendKey.put(ctx, ebTracer.sendVal);
reply.reply("msg_3");
}));
});
diff --git a/src/test/java/io/vertx/core/spi/tracing/HttpTracerTestBase.java b/src/test/java/io/vertx/core/spi/tracing/HttpTracerTestBase.java
index 5874d048daa..68275d9bb85 100644
--- a/src/test/java/io/vertx/core/spi/tracing/HttpTracerTestBase.java
+++ b/src/test/java/io/vertx/core/spi/tracing/HttpTracerTestBase.java
@@ -17,14 +17,14 @@
import io.vertx.core.http.HttpTestBase;
import io.vertx.core.http.RequestOptions;
import io.vertx.core.impl.ContextInternal;
+import io.vertx.core.impl.ContextLocalHelper;
+import io.vertx.core.spi.context.storage.ContextLocal;
import io.vertx.core.spi.observability.HttpRequest;
import io.vertx.core.spi.observability.HttpResponse;
import io.vertx.core.tracing.TracingPolicy;
-import io.vertx.test.core.TestUtils;
import org.junit.Test;
import java.util.UUID;
-import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.BiConsumer;
@@ -32,6 +32,19 @@
public abstract class HttpTracerTestBase extends HttpTestBase {
private VertxTracer tracer;
+ private ContextLocal key;
+
+ @Override
+ public void setUp() throws Exception {
+ key = ContextLocal.registerLocal(Object.class);
+ super.setUp();
+ }
+
+ @Override
+ protected void tearDown() throws Exception {
+ ContextLocalHelper.reset();
+ super.tearDown();
+ }
@Override
protected VertxTracer getTracer() {
@@ -40,14 +53,13 @@ protected VertxTracer getTracer() {
@Test
public void testHttpServer() throws Exception {
- String key = TestUtils.randomAlphaString(10);
Object val = new Object();
AtomicInteger seq = new AtomicInteger();
setTracer(new VertxTracer() {
@Override
public Object receiveRequest(Context context, SpanKind kind, TracingPolicy policy, Object request, String operation, Iterable headers, TagExtractor tagExtractor) {
- assertNull(context.getLocal(key));
- context.putLocal(key, val);
+ assertNull(key.get(context));
+ key.put(context, val);
assertTrue(seq.compareAndSet(0, 1));
return request;
}
@@ -57,17 +69,17 @@ public void sendResponse(Context context, Object response, Object payload, Throw
assertNotNull(response);
assertTrue(response instanceof HttpServerResponse);
assertNull(failure);
- assertSame(val, context.getLocal(key));
- assertTrue(context.removeLocal(key));
+ assertSame(val, key.get(context));
+ key.remove(context);
}
});
CountDownLatch latch = new CountDownLatch(1);
server.requestHandler(req -> {
assertEquals(1, seq.get());
ContextInternal ctx = (ContextInternal) Vertx.currentContext();
- assertSame(val, ctx.localContextData().get(key));
+ assertSame(val, key.get(ctx));
req.response().closeHandler(v -> {
- assertNull(ctx.localContextData().get(key));
+ assertNull(key.get(ctx));
assertEquals(2, seq.get());
});
req.response().end();
@@ -86,14 +98,13 @@ public void sendResponse(Context context, Object response, Object payload, Throw
@Test
public void testHttpServerError() throws Exception {
waitFor(3);
- String key = TestUtils.randomAlphaString(10);
Object val = new Object();
AtomicInteger seq = new AtomicInteger();
setTracer(new VertxTracer() {
@Override
public Object receiveRequest(Context context, SpanKind kind, TracingPolicy policy, Object request, String operation, Iterable headers, TagExtractor tagExtractor) {
- assertNull(context.getLocal(key));
- context.putLocal(key, val);
+ assertNull(key.get(context));
+ key.put(context, val);
assertTrue(seq.compareAndSet(0, 1));
return request;
}
@@ -102,7 +113,6 @@ public void sendResponse(Context context, Object response, Object payload, Throw
assertTrue(seq.compareAndSet(1, 2));
assertNull(response);
assertNotNull(failure);
- assertTrue(context.removeLocal(key));
complete();
}
});
@@ -110,7 +120,7 @@ public void sendResponse(Context context, Object response, Object payload, Throw
server.requestHandler(req -> {
assertEquals(1, seq.get());
ContextInternal ctx = (ContextInternal) Vertx.currentContext();
- assertSame(val, ctx.localContextData().get(key));
+ assertSame(val, key.get(ctx));
req.exceptionHandler(v -> {
// assertNull(ctx.localContextData().get(key));
// assertEquals(2, seq.get());
@@ -147,14 +157,13 @@ public void testHttpClientRequestOverrideOperation() throws Exception {
}
private void testHttpClientRequest(RequestOptions request, String expectedOperation) throws Exception {
- String key = TestUtils.randomAlphaString(10);
Object val = new Object();
AtomicInteger seq = new AtomicInteger();
String traceId = UUID.randomUUID().toString();
setTracer(new VertxTracer() {
@Override
public Object sendRequest(Context context, SpanKind kind, TracingPolicy policy, Object request, String operation, BiConsumer headers, TagExtractor tagExtractor) {
- assertSame(val, context.getLocal(key));
+ assertSame(val, key.get(context));
assertTrue(seq.compareAndSet(0, 1));
headers.accept("X-B3-TraceId", traceId);
assertNotNull(request);
@@ -164,8 +173,8 @@ public Object sendRequest(Context context, SpanKind kind, TracingPolicy policy,
}
@Override
public void receiveResponse(Context context, Object response, Object payload, Throwable failure, TagExtractor tagExtractor) {
- assertSame(val, context.getLocal(key));
- assertTrue(context.removeLocal(key));
+ assertSame(val, key.get(context));
+ key.remove(context);
assertNotNull(response);
assertTrue(response instanceof HttpResponse);
assertNull(failure);
@@ -182,15 +191,14 @@ public void receiveResponse(Context context, Object response, Object payload, Th
awaitLatch(latch);
Context ctx = vertx.getOrCreateContext();
ctx.runOnContext(v1 -> {
- ConcurrentMap tracerMap = ((ContextInternal) ctx).localContextData();
- tracerMap.put(key, val);
+ key.put(ctx, val);
client.request(request, onSuccess(req -> {
- req.send(onSuccess(resp -> {
+ req.send().onComplete(onSuccess(resp -> {
resp.endHandler(v2 -> {
// Updates are done on the HTTP client context, so we need to run task on this context
// to avoid data race
ctx.runOnContext(v -> {
- assertNull(tracerMap.get(key));
+ assertNull(key.get(ctx));
testComplete();
});
});
@@ -203,22 +211,21 @@ public void receiveResponse(Context context, Object response, Object payload, Th
@Test
public void testHttpClientError() throws Exception {
waitFor(2);
- String key = TestUtils.randomAlphaString(10);
Object val = new Object();
AtomicInteger seq = new AtomicInteger();
String traceId = UUID.randomUUID().toString();
setTracer(new VertxTracer() {
@Override
public Object sendRequest(Context context, SpanKind kind, TracingPolicy policy, Object request, String operation, BiConsumer headers, TagExtractor tagExtractor) {
- assertSame(val, context.getLocal(key));
+ assertSame(val, key.get(context));
assertTrue(seq.compareAndSet(0, 1));
headers.accept("X-B3-TraceId", traceId);
return request;
}
@Override
public void receiveResponse(Context context, Object response, Object payload, Throwable failure, TagExtractor tagExtractor) {
- assertSame(val, context.getLocal(key));
- assertTrue(context.removeLocal(key));
+ assertSame(val, key.get(context));
+ key.remove(context);
assertNull(response);
assertNotNull(failure);
assertTrue(seq.compareAndSet(1, 2));
@@ -235,8 +242,7 @@ public void receiveResponse(Context context, Object response, Object payload, Th
awaitLatch(latch);
Context ctx = vertx.getOrCreateContext();
ctx.runOnContext(v1 -> {
- ConcurrentMap tracerMap = ((ContextInternal) ctx).localContextData();
- tracerMap.put(key, val);
+ key.put(ctx, val);
client.request(HttpMethod.GET, DEFAULT_HTTP_PORT, "localhost", "/").onComplete(onSuccess(req -> {
req.send().onComplete(onFailure(err -> {
// assertNull(tracerMap.get(key));
diff --git a/src/test/java/io/vertx/core/spi/tracing/LocalEventBusTracerTest.java b/src/test/java/io/vertx/core/spi/tracing/LocalEventBusTracerTest.java
index 44fffc93aec..e1e0e1f48c9 100644
--- a/src/test/java/io/vertx/core/spi/tracing/LocalEventBusTracerTest.java
+++ b/src/test/java/io/vertx/core/spi/tracing/LocalEventBusTracerTest.java
@@ -32,15 +32,15 @@ public void testInboundInterceptor() throws Exception {
tracer = new VertxTracer() {};
vertx2.eventBus().addInboundInterceptor(deliveryCtx -> {
ContextInternal ctx = (ContextInternal) vertx.getOrCreateContext();
- ctx.localContextData().put("key", "val");
+// ctx.putLocal(FakeTracer.ACTIVE_SCOPE_KEY, "val");
deliveryCtx.next();
});
Context receiveCtx = vertx2.getOrCreateContext();
CountDownLatch latch = new CountDownLatch(1);
receiveCtx.runOnContext(v -> {
vertx2.eventBus().consumer("the_address", msg -> {
- Object val = ((ContextInternal) vertx.getOrCreateContext()).localContextData().get("key");
- assertEquals("val", val);
+// Object val = vertx.getOrCreateContext().getLocal(FakeTracer.ACTIVE_SCOPE_KEY);
+// assertEquals("val", val);
testComplete();
});
latch.countDown();
diff --git a/src/test/java/io/vertx/it/CustomContextLocal.java b/src/test/java/io/vertx/it/CustomContextLocal.java
new file mode 100644
index 00000000000..5f01ed44a31
--- /dev/null
+++ b/src/test/java/io/vertx/it/CustomContextLocal.java
@@ -0,0 +1,16 @@
+package io.vertx.it;
+
+import io.vertx.core.impl.VertxBuilder;
+import io.vertx.core.spi.VertxServiceProvider;
+import io.vertx.core.spi.context.storage.ContextLocal;
+
+public class CustomContextLocal implements VertxServiceProvider {
+
+ public static ContextLocal CUSTOM_LOCAL = ContextLocal.registerLocal(Object.class);
+ public static volatile boolean initialized;
+
+ @Override
+ public void init(VertxBuilder builder) {
+ initialized = true;
+ }
+}
diff --git a/src/test/java/io/vertx/it/CustomContextLocalTest.java b/src/test/java/io/vertx/it/CustomContextLocalTest.java
new file mode 100644
index 00000000000..9b03b0a1f0f
--- /dev/null
+++ b/src/test/java/io/vertx/it/CustomContextLocalTest.java
@@ -0,0 +1,17 @@
+package io.vertx.it;
+
+import io.vertx.core.Context;
+import io.vertx.test.core.VertxTestBase;
+import org.junit.Test;
+
+public class CustomContextLocalTest extends VertxTestBase {
+
+ @Test
+ public void testResolver() {
+ assertTrue(CustomContextLocal.initialized);
+ Context context = vertx.getOrCreateContext();
+ Object o = new Object();
+ CustomContextLocal.CUSTOM_LOCAL.put(context, o);
+ assertSame(o, CustomContextLocal.CUSTOM_LOCAL.get(context));
+ }
+}
diff --git a/src/test/java/io/vertx/test/faketracer/FakeTracer.java b/src/test/java/io/vertx/test/faketracer/FakeTracer.java
index cf30b3a7c9f..ff7af3e5adc 100644
--- a/src/test/java/io/vertx/test/faketracer/FakeTracer.java
+++ b/src/test/java/io/vertx/test/faketracer/FakeTracer.java
@@ -13,6 +13,8 @@
import io.vertx.core.Context;
import io.vertx.core.Vertx;
+import io.vertx.core.impl.ContextLocalHelper;
+import io.vertx.core.spi.context.storage.ContextLocal;
import io.vertx.core.spi.tracing.SpanKind;
import io.vertx.core.spi.tracing.TagExtractor;
import io.vertx.core.spi.tracing.VertxTracer;
@@ -30,8 +32,7 @@
*/
public class FakeTracer implements VertxTracer {
- private static final String ACTIVE_SCOPE_KEY = "active.scope";
-
+ private final ContextLocal scopeKey = ContextLocal.registerLocal(Scope.class);
private AtomicInteger idGenerator = new AtomicInteger(0);
List finishedSpans = new CopyOnWriteArrayList<>();
private AtomicInteger closeCount = new AtomicInteger();
@@ -53,7 +54,7 @@ public Span activeSpan() {
}
public Span activeSpan(Context data) {
- Scope scope = data.getLocal(ACTIVE_SCOPE_KEY);
+ Scope scope = scopeKey.get(data);
return scope != null ? scope.wrapped : null;
}
@@ -62,9 +63,9 @@ public Scope activate(Span span) {
}
public Scope activate(Context context, Span span) {
- Scope toRestore = context.getLocal(ACTIVE_SCOPE_KEY);
+ Scope toRestore = scopeKey.get(context);
Scope active = new Scope(this, span, toRestore);
- context.putLocal(ACTIVE_SCOPE_KEY, active);
+ scopeKey.put(context, active);
return active;
}
@@ -174,6 +175,7 @@ public List getFinishedSpans() {
@Override
public void close() {
closeCount.incrementAndGet();
+ ContextLocalHelper.reset();
}
public int closeCount() {