From 793ffacf8d8deab81b25e1a6aa272a7a61a57748 Mon Sep 17 00:00:00 2001 From: Tim Quinn Date: Wed, 17 Jul 2024 10:58:24 -0500 Subject: [PATCH] Add RW locking to better manage concurrency (#8997) * Add RW locking to better manage concurrency Signed-off-by: Tim Quinn * Address review comments --------- Signed-off-by: Tim Quinn --- .../providers/micrometer/MMeterRegistry.java | 318 ++++++++++++------ 1 file changed, 210 insertions(+), 108 deletions(-) diff --git a/metrics/providers/micrometer/src/main/java/io/helidon/metrics/providers/micrometer/MMeterRegistry.java b/metrics/providers/micrometer/src/main/java/io/helidon/metrics/providers/micrometer/MMeterRegistry.java index 2955630a5e7..201d4e5b3c8 100644 --- a/metrics/providers/micrometer/src/main/java/io/helidon/metrics/providers/micrometer/MMeterRegistry.java +++ b/metrics/providers/micrometer/src/main/java/io/helidon/metrics/providers/micrometer/MMeterRegistry.java @@ -25,7 +25,8 @@ import java.util.Optional; import java.util.Set; import java.util.concurrent.CopyOnWriteArrayList; -import java.util.concurrent.locks.ReentrantLock; +import java.util.concurrent.locks.ReadWriteLock; +import java.util.concurrent.locks.ReentrantReadWriteLock; import java.util.function.Consumer; import java.util.function.Function; import java.util.function.Predicate; @@ -80,6 +81,11 @@ class MMeterRegistry implements io.helidon.metrics.api.MeterRegistry { private static final System.Logger LOGGER = System.getLogger(MMeterRegistry.class.getName()); private final io.micrometer.core.instrument.MeterRegistry delegate; + /* + Note that onMeterAdded and onMeterRemoved which manage these two lists do not lock; they rely on + the thread safety provided by the copy-on-write behavior. If you change the implementations here you might need to add + locking to those two methods. + */ private final List> onAddListeners = new CopyOnWriteArrayList<>(); private final List> onRemoveListeners = new CopyOnWriteArrayList<>(); @@ -102,7 +108,7 @@ class MMeterRegistry implements io.helidon.metrics.api.MeterRegistry { private final Map> scopeMembership = new HashMap<>(); private final Map> metersById = new HashMap<>(); - private final ReentrantLock lock = new ReentrantLock(); + private final ReadWriteLock lock = new ReentrantReadWriteLock(); private MMeterRegistry(io.micrometer.core.instrument.MeterRegistry delegate, MicrometerMetricsFactory metricsFactory, @@ -215,39 +221,65 @@ static MMeterRegistry applyMetersProvidersToRegistry(MetricsFactory factory, @Override public void close() { - onAddListeners.clear(); - onRemoveListeners.clear(); - List.copyOf(meters.values()).forEach(this::remove); - meters.clear(); - buildersByPromMeterId.clear(); - scopeMembership.clear(); - metersById.clear(); + lock.writeLock().lock(); + try { + onAddListeners.clear(); + onRemoveListeners.clear(); + List.copyOf(meters.values()).forEach(this::remove); + meters.clear(); + buildersByPromMeterId.clear(); + scopeMembership.clear(); + metersById.clear(); + } finally { + lock.writeLock().unlock(); + } } @Override public List meters() { - return meters.values() - .stream() + lock.readLock().lock(); + List temp; + try { + temp = new ArrayList<>(meters.values()); + } finally { + lock.readLock().unlock(); + } + return temp.stream() .map(io.helidon.metrics.api.Meter.class::cast) .toList(); } @Override public Collection meters(Predicate filter) { - return meters.values() - .stream() + lock.readLock().lock(); + List temp; + try { + temp = new ArrayList<>(meters.values()); + } finally { + lock.readLock().unlock(); + } + return temp.stream() .map(io.helidon.metrics.api.Meter.class::cast) .filter(filter) .toList(); + } @Override public Iterable scopes() { - return scopeMembership.keySet(); + lock.readLock().lock(); + try { + return new HashSet<>(scopeMembership.keySet()); + } finally { + lock.readLock().unlock(); + } } @Override public boolean isMeterEnabled(String name, Map tags, Optional scope) { + /* + This method uses only config, not any mutable data structures, so no need to lock. + */ String effectiveScope = scope.orElse(SystemTagsManager.instance().effectiveScope(scope) .orElse(io.helidon.metrics.api.Meter.Scope.DEFAULT)); return metricsConfig.enabled() @@ -278,28 +310,32 @@ public Optional meter(Class mClas String name, Iterable tags) { - Search search = delegate().find(name) - .tags(MTag.tags(tags)); - Meter match = search.meter(); + lock.readLock().lock(); + try { + Search search = delegate().find(name) + .tags(MTag.tags(tags)); + Meter match = search.meter(); - if (match == null) { - return Optional.empty(); - } - io.helidon.metrics.api.Meter neutralMeter = meters.get(match); - if (neutralMeter == null) { - LOGGER.log(Level.WARNING, String.format("Found no Helidon counterpart for Micrometer meter %s %s", - name, - Util.list(tags))); - return Optional.empty(); - } - if (mClass.isInstance(neutralMeter)) { - return Optional.of(mClass.cast(neutralMeter)); + if (match == null) { + return Optional.empty(); + } + io.helidon.metrics.api.Meter neutralMeter = meters.get(match); + if (neutralMeter == null) { + LOGGER.log(Level.WARNING, String.format("Found no Helidon counterpart for Micrometer meter %s %s", + name, + Util.list(tags))); + return Optional.empty(); + } + if (mClass.isInstance(neutralMeter)) { + return Optional.of(mClass.cast(neutralMeter)); + } + throw new IllegalArgumentException( + String.format("Matching meter is of type %s but %s was requested", + match.getClass().getName(), + mClass.getName())); + } finally { + lock.readLock().unlock(); } - throw new IllegalArgumentException( - String.format("Matching meter is of type %s but %s was requested", - match.getClass().getName(), - mClass.getName())); - } @Override @@ -343,16 +379,21 @@ io.micrometer.core.instrument.MeterRegistry delegate() { @Override public Iterable meters(Iterable scopeSelection) { - if (scopeSelection.iterator().hasNext()) { - Set result = new HashSet<>(); - for (String scope : scopeSelection) { - if (scopeMembership.containsKey(scope)) { - result.addAll(scopeMembership.get(scope)); + lock.readLock().lock(); + try { + if (scopeSelection.iterator().hasNext()) { + Set result = new HashSet<>(); + for (String scope : scopeSelection) { + if (scopeMembership.containsKey(scope)) { + result.addAll(scopeMembership.get(scope)); + } } + return result; } - return result; + return meters(); + } finally { + lock.readLock().unlock(); } - return meters(); } @Override @@ -368,7 +409,7 @@ public io.helidon.metrics.api.MeterRegistry onMeterRemoved(Consumer listener.accept(result)); - return result; - } finally { - lock.unlock(); - } - } + // No locking needed yet. If configuration (which is immutable) says the meter is disabled, we just return a no-op meter; + // we do not update any data structures. + io.helidon.metrics.api.Meter disabledMeter = noopMeterIfDisabled(builder); + if (disabledMeter != null) { + return disabledMeter; + } - io.helidon.metrics.api.Meter helidonMeter; - - if (builder instanceof MCounter.Builder cBuilder) { - helidonMeter = getOrCreate(cBuilder, cBuilder::addTag, cBuilder.delegate()::register); - } else if (builder instanceof MFunctionalCounter.Builder fcBuilder) { - helidonMeter = getOrCreate(fcBuilder, fcBuilder::addTag, fcBuilder.delegate()::register); - } else if (builder instanceof MDistributionSummary.Builder sBuilder) { - helidonMeter = getOrCreate(sBuilder, sBuilder::addTag, sBuilder.delegate()::register); - } else if (builder instanceof MGauge.Builder gBuilder) { - helidonMeter = getOrCreate(gBuilder, gBuilder::addTag, ((MGauge.Builder) gBuilder).delegate()::register); - } else if (builder instanceof MTimer.Builder tBuilder) { - helidonMeter = getOrCreate(tBuilder, tBuilder::addTag, tBuilder.delegate()::register); - } else { - throw new IllegalArgumentException(String.format("Unexpected builder type %s, expected one of %s", - builder.getClass().getName(), - List.of(MCounter.Builder.class.getName(), - MFunctionalCounter.Builder.class.getName(), - MDistributionSummary.Builder.class.getName(), - MGauge.Builder.class.getName(), - MTimer.Builder.class.getName()))); - } - return helidonMeter; - } finally { - lock.unlock(); + io.helidon.metrics.api.Meter helidonMeter; + + if (builder instanceof MCounter.Builder cBuilder) { + helidonMeter = getOrCreate(cBuilder, cBuilder::addTag, cBuilder.delegate()::register); + } else if (builder instanceof MFunctionalCounter.Builder fcBuilder) { + helidonMeter = getOrCreate(fcBuilder, fcBuilder::addTag, fcBuilder.delegate()::register); + } else if (builder instanceof MDistributionSummary.Builder sBuilder) { + helidonMeter = getOrCreate(sBuilder, sBuilder::addTag, sBuilder.delegate()::register); + } else if (builder instanceof MGauge.Builder gBuilder) { + helidonMeter = getOrCreate(gBuilder, gBuilder::addTag, ((MGauge.Builder) gBuilder).delegate()::register); + } else if (builder instanceof MTimer.Builder tBuilder) { + helidonMeter = getOrCreate(tBuilder, tBuilder::addTag, tBuilder.delegate()::register); + } else { + throw new IllegalArgumentException(String.format("Unexpected builder type %s, expected one of %s", + builder.getClass().getName(), + List.of(MCounter.Builder.class.getName(), + MFunctionalCounter.Builder.class.getName(), + MDistributionSummary.Builder.class.getName(), + MGauge.Builder.class.getName(), + MTimer.Builder.class.getName()))); } + return helidonMeter; } , M extends Meter, B, HB extends MMeter.Builder> void onMeterAdded(M addedMeter) { - lock.lock(); + + /* + We are not guaranteed that one of our own update operations--which would already hold the write lock--is triggering + this callback from Micrometer. A developer might be using the Micrometer API directly, for example. So acquire the lock + in any case. + */ + + lock.writeLock().lock(); try { /* If we originated this callback by invoking the delegate registry, then there should be a builder @@ -497,12 +534,15 @@ Signal to getOrCreate that in fact a new delegate meter was created (because we }); } finally { - lock.unlock(); + lock.writeLock().unlock(); } } void onMeterRemoved(Meter removedMeter) { - lock.lock(); + /* + See locking comment with onMeterAdded. + */ + lock.writeLock().lock(); try { MMeter removedHelidonMeter = meters.remove(removedMeter); @@ -512,10 +552,43 @@ void onMeterRemoved(Meter removedMeter) { recordRemove(removedHelidonMeter); } } finally { - lock.unlock(); + lock.writeLock().unlock(); } } + private io.helidon.metrics.api.Meter noopMeterIfDisabled(io.helidon.metrics.api.Meter.Builder builder) { + if (!isMeterEnabled(builder.name(), builder.tags(), builder.scope())) { + + io.helidon.metrics.api.Meter result = metricsFactory.noOpMeter(builder); + onAddListeners.forEach(listener -> listener.accept(result)); + return result; + } + return null; + } + + /* + * Returns an existing meter matching the specified builder metadata and ID, or null if none. + * + * The caller must have acquired either the read or write lock. + */ + private , + HM extends MMeter> MMeter meterIfRegistered(MMeter.Builder mBuilder, + io.helidon.metrics.api.Meter.Id id) { + MMeter foundMeter = metersById.get(id); + if (foundMeter != null) { + if (!mBuilder.meterType().isInstance(foundMeter)) { + throw new IllegalArgumentException("Attempt to get or create a meter of type " + + mBuilder.meterType().getName() + + " when an existing meter " + id + + " has an incompatible type " + foundMeter.getClass() + .getName()); + } + return (HM) foundMeter; + } + return null; + } + private static io.micrometer.core.instrument.MeterRegistry ensurePrometheusRegistryIsPresent( io.micrometer.core.instrument.MeterRegistry meterRegistry, MetricsConfig metricsConfig) { @@ -531,10 +604,12 @@ private static io.micrometer.core.instrument.MeterRegistry ensurePrometheusRegis return meterRegistry; } - private , HM extends MMeter> HM getOrCreate( - HB mBuilder, - Function builderTagSetter, - Function registration) { + private , + HM extends MMeter> io.helidon.metrics.api.Meter getOrCreate(HB mBuilder, + Function builderTagSetter, + Function registration) { // Select the actual scope value from the builder (if any) or a default scope value known to the system tags manager. Optional effectiveScope = SystemTagsManager.instance() @@ -546,20 +621,31 @@ private , HM extends MM io.helidon.metrics.api.Meter.Id id = mBuilder.id(); - MMeter foundMeter = metersById.get(id); - if (foundMeter != null) { - if (!mBuilder.meterType().isInstance(foundMeter)) { - throw new IllegalArgumentException("Attempt to get or create a meter of type " - + mBuilder.meterType().getName() - + " when an existing meter " + id - + " has an incompatible type " + foundMeter.getClass() - .getName()); + lock.readLock().lock(); + + try { + MMeter foundMeter = meterIfRegistered(mBuilder, id); + if (foundMeter != null) { + return (HM) foundMeter; } - return (HM) foundMeter; + } finally { + lock.readLock().unlock(); } - lock.lock(); + /* + Effectively, "promote" our lock from read (which we acquired a few lines above) to write. Because ReentrantReadWriteLock + does not actually support promoting, we have to release the read lock (which we did just above), acquire the write + lock, and recheck what we checked earlier while we had the read lock. + */ + + lock.writeLock().lock(); + try { + io.helidon.metrics.api.Meter previouslyRegisteredMeter = meterIfRegistered(mBuilder, id); + if (previouslyRegisteredMeter != null) { + return previouslyRegisteredMeter; + } + Map> pendingBuildersInScope = buildersByPromMeterId.computeIfAbsent(effectiveScope.orElse(""), k -> new HashMap<>()); @@ -594,7 +680,7 @@ but we have no record of that meter being linked to one of ours. This is surpris return result; } finally { - lock.unlock(); + lock.writeLock().unlock(); } } @@ -683,22 +769,30 @@ private Optional internalRemove(io.helidon.metrics Iterable tags = SystemTagsManager.instance().withScopeTag(id.tags(), scope); - Meter nativeMeter = delegate.find(id.name()) - .tags(MTag.tags(tags)) - .meter(); - - lock.lock(); + lock.writeLock().lock(); try { + Meter nativeMeter = delegate.find(id.name()) + .tags(MTag.tags(tags)) + .meter(); + if (nativeMeter != null) { MMeter result = meters.get(nativeMeter); delegate.remove(nativeMeter); - onRemoveListeners.forEach(listener -> listener.accept(result)); + onRemoveListeners.forEach(listener -> { + try { + listener.accept(result); + } catch (Exception ex) { + LOGGER.log(Level.WARNING, + "Error invoking onRemoveListener " + listener.getClass().getName() + "; continuing", + ex); + } + }); return Optional.of(result); } return Optional.empty(); } finally { - lock.unlock(); + lock.writeLock().unlock(); } } @@ -727,7 +821,15 @@ private MMeter recordRemove(MMeter removedHelidonMeter) { scopeMembers.remove(removedHelidonMeter); } }); - onRemoveListeners.forEach(listener -> listener.accept(removedHelidonMeter)); + onRemoveListeners.forEach(listener -> { + try { + listener.accept(removedHelidonMeter); + } catch (Exception ex) { + LOGGER.log(Level.WARNING, + "Error invoking onRemoveListener " + listener.getClass().getName() + "; continuing", + ex); + } + }); return removedHelidonMeter; }