Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve serialization efficiency of replicated Cache and CachedValue #392

Merged
merged 1 commit into from
Sep 5, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,9 @@

## 22.0-SNAPSHOT - unreleased

### ⚙️ Technical
* Improvements to serialization efficiency of replicated `Cache` and `CachedValue`

## 21.0.0 - 2024-09-03

### 💥 Breaking Changes (upgrade difficulty: 🟢 LOW - latest Hoist React + DB col additions)
Expand Down
26 changes: 4 additions & 22 deletions src/main/groovy/io/xh/hoist/cache/BaseCache.groovy
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,7 @@

package io.xh.hoist.cache

import com.hazelcast.core.EntryEvent
import com.hazelcast.core.EntryListener

import groovy.transform.CompileStatic
import io.xh.hoist.BaseService
import io.xh.hoist.cluster.ClusterService
Expand Down Expand Up @@ -53,7 +52,7 @@ abstract class BaseCache<V> {
public final boolean serializeOldValue

/** Handlers to be called on change with a {@link CacheValueChanged} object. */
public final List<Closure> onChange
public final List<Closure> onChange = []

BaseCache(
BaseService svc,
Expand All @@ -62,8 +61,7 @@ abstract class BaseCache<V> {
Closure expireFn,
Closure timestampFn,
boolean replicate,
boolean serializeOldValue,
Closure onChange
boolean serializeOldValue
) {
this.svc = svc
this.name = name
Expand All @@ -72,13 +70,10 @@ abstract class BaseCache<V> {
this.timestampFn = timestampFn
this.replicate = replicate
this.serializeOldValue = serializeOldValue
this.onChange = onChange ? [onChange] : []
}

/** @param handler called on change with a {@link CacheValueChanged} object. */
void addChangeHandler(Closure handler) {
onChange << handler
}
abstract void addChangeHandler(Closure handler)

/** Clear all values. */
abstract void clear()
Expand All @@ -90,19 +85,6 @@ abstract class BaseCache<V> {
return replicate && ClusterService.multiInstanceEnabled
}

protected EntryListener getHzEntryListener() {
Closure onChg = { EntryEvent<?, Entry<V>> it ->
fireOnChange(it.key, it.oldValue?.value, it.value?.value)
}
return [
entryAdded : onChg,
entryUpdated: onChg,
entryRemoved: onChg,
entryEvicted: onChg,
entryExpired: onChg
] as EntryListener
}

protected void fireOnChange(Object key, V oldValue, V value) {
def change = new CacheValueChanged(this, key, oldValue, value)
onChange.each { it.call(change) }
Expand Down
18 changes: 11 additions & 7 deletions src/main/groovy/io/xh/hoist/cache/Cache.groovy
Original file line number Diff line number Diff line change
Expand Up @@ -41,18 +41,14 @@ class Cache<K,V> extends BaseCache<V> {
@NamedParam boolean serializeOldValue = false,
@NamedParam Closure onChange = null
) {
super(svc, name, expireTime, expireFn, timestampFn, replicate, serializeOldValue, onChange)
super(svc, name, expireTime, expireFn, timestampFn, replicate, serializeOldValue)

if (replicate && !name) {
throw new IllegalArgumentException("Cannot create a replicated Cache without a unique name")
}

if (useCluster) {
_map = svc.getReplicatedMap(name)
(_map as ReplicatedMap).addEntryListener(getHzEntryListener())
} else {
_map = new ConcurrentHashMap()
}
_map = useCluster ? svc.getReplicatedMap(name) : new ConcurrentHashMap()
if (onChange) addChangeHandler(onChange)

timer = new Timer(
owner: svc,
Expand Down Expand Up @@ -131,6 +127,14 @@ class Cache<K,V> extends BaseCache<V> {
_map.each { k, v -> remove(k)}
}

void addChangeHandler(Closure handler) {
if (!onChange && _map instanceof ReplicatedMap) {
_map.addEntryListener(new HzEntryListener(this))
}
onChange << handler
}


/**
* Wait for the cache entry to be populated.
* @param key, entry to check
Expand Down
19 changes: 10 additions & 9 deletions src/main/groovy/io/xh/hoist/cache/CachedValue.groovy
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,6 @@ import java.util.concurrent.TimeoutException
import static io.xh.hoist.util.DateTimeUtils.SECONDS
import static io.xh.hoist.util.DateTimeUtils.intervalElapsed
import static java.lang.System.currentTimeMillis
import static java.util.Collections.emptyMap


/**
* Similar to {@link Cache}, but a single value that can be read, written, and expired.
Expand All @@ -32,14 +30,10 @@ class CachedValue<V> extends BaseCache<V> {
@NamedParam boolean serializeOldValue = false,
@NamedParam Closure onChange = null
) {
super(svc, name, expireTime, expireFn, timestampFn, replicate, serializeOldValue, onChange)
super(svc, name, expireTime, expireFn, timestampFn, replicate, serializeOldValue)

if (useCluster) {
_map = svc.replicatedCachedValuesMap
(_map as ReplicatedMap).addEntryListener(getHzEntryListener(), name)
} else {
_map = svc.localCachedValuesMap
}
_map = useCluster ? svc.replicatedCachedValuesMap : svc.localCachedValuesMap
if (onChange) addChangeHandler(onChange)
}

/** @returns the cached value. */
Expand Down Expand Up @@ -108,4 +102,11 @@ class CachedValue<V> extends BaseCache<V> {
throw new TimeoutException(msg)
}
}

void addChangeHandler(Closure handler) {
if (!onChange && _map instanceof ReplicatedMap) {
_map.addEntryListener(new HzEntryListener(this), name)
}
onChange << handler
}
}
43 changes: 43 additions & 0 deletions src/main/groovy/io/xh/hoist/cache/HzEntryListener.groovy
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
package io.xh.hoist.cache

import com.hazelcast.core.EntryEvent
import com.hazelcast.core.EntryListener
import com.hazelcast.map.MapEvent

class HzEntryListener implements EntryListener {

private BaseCache target

HzEntryListener(BaseCache target) {
this.target = target
}

void entryAdded(EntryEvent event) {
fireEvent(event)
}

void entryEvicted(EntryEvent event) {
fireEvent(event)
}

void entryExpired(EntryEvent event) {
fireEvent(event)
}

void entryRemoved(EntryEvent event) {
fireEvent(event)
}

void entryUpdated(EntryEvent event) {
fireEvent(event)
}

void mapCleared(MapEvent event) {}

void mapEvicted(MapEvent event) {}

private fireEvent(EntryEvent event) {
target.fireOnChange(event.key, event.oldValue?.value, event.value?.value)
}

}
Loading