Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ULL improvements #194

Closed
wants to merge 5 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -196,7 +196,7 @@ jacocoTestReport {

test {
useJUnitPlatform()
maxHeapSize '4g'
maxHeapSize '8g'
}

jmh {
Expand Down
2 changes: 1 addition & 1 deletion gradle/wrapper/gradle-wrapper.properties
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
distributionBase=GRADLE_USER_HOME
distributionPath=wrapper/dists
distributionUrl=https\://services.gradle.org/distributions/gradle-8.4-bin.zip
distributionUrl=https\://services.gradle.org/distributions/gradle-8.5-bin.zip
networkTimeout=10000
validateDistributionUrl=true
zipStoreBase=GRADLE_USER_HOME
Expand Down
14 changes: 7 additions & 7 deletions gradlew
Original file line number Diff line number Diff line change
Expand Up @@ -145,15 +145,15 @@ if ! "$cygwin" && ! "$darwin" && ! "$nonstop" ; then
case $MAX_FD in #(
max*)
# In POSIX sh, ulimit -H is undefined. That's why the result is checked to see if it worked.
# shellcheck disable=SC3045
# shellcheck disable=SC2039,SC3045
MAX_FD=$( ulimit -H -n ) ||
warn "Could not query maximum file descriptor limit"
esac
case $MAX_FD in #(
'' | soft) :;; #(
*)
# In POSIX sh, ulimit -n is undefined. That's why the result is checked to see if it worked.
# shellcheck disable=SC3045
# shellcheck disable=SC2039,SC3045
ulimit -n "$MAX_FD" ||
warn "Could not set maximum file descriptor limit to $MAX_FD"
esac
Expand Down Expand Up @@ -202,11 +202,11 @@ fi
# Add default JVM options here. You can also use JAVA_OPTS and GRADLE_OPTS to pass JVM options to this script.
DEFAULT_JVM_OPTS='"-Xmx64m" "-Xms64m"'

# Collect all arguments for the java command;
# * $DEFAULT_JVM_OPTS, $JAVA_OPTS, and $GRADLE_OPTS can contain fragments of
# shell script including quotes and variable substitutions, so put them in
# double quotes to make sure that they get re-expanded; and
# * put everything else in single quotes, so that it's not re-expanded.
# Collect all arguments for the java command:
# * DEFAULT_JVM_OPTS, JAVA_OPTS, JAVA_OPTS, and optsEnvironmentVar are not allowed to contain shell fragments,
# and any embedded shellness will be escaped.
# * For example: A user cannot expect ${Hostname} to be expanded, as it is an environment variable and will be
# treated as '${Hostname}' itself on the command line.

set -- \
"-Dorg.gradle.appname=$APP_BASE_NAME" \
Expand Down
100 changes: 32 additions & 68 deletions src/main/java/com/dynatrace/hash4j/distinctcount/UltraLogLog.java
Original file line number Diff line number Diff line change
Expand Up @@ -351,26 +351,11 @@ public double getDistinctCountEstimate(Estimator estimator) {
// visible for testing
// returns register change probability scaled by 2^64
static long getScaledRegisterChangeProbability(byte reg, int p) {
int r = reg & 0xFF;
int r2 = r - (p << 2) - 4;
if (r2 < 0) {
long ret = 4L;
if (r2 == -2 || r2 == -8) {
ret -= 2;
}
if (r2 == -2 || r2 == -4) {
ret -= 1;
}
return ret << (62 - p);
} else {
int k = r2 >>> 2;
long ret = 0xE000000000000000L;
int y0 = r & 1;
int y1 = (r >>> 1) & 1;
ret -= (long) y0 << 63;
ret -= (long) y1 << 62;
return ret >>> (k + p);
}
if (reg == 0) return 1L << -p;
int k = (reg >> 2) - p + 1; // is in the range [0, 64-p]
int l0 = ~reg & 1;
int l1 = ~reg & 2;
return (long) (l1 | (l0 << 2) | 1) << ~k >>> p;
}

/**
Expand Down Expand Up @@ -431,30 +416,14 @@ private static final class MaximumLikelihoodEstimator implements Estimator {

// returns contribution to alpha, scaled by 2^64
private static long contribute(int r, int[] b, int p) {
int r2 = r - (p << 2) - 4;
if (r2 < 0) {
long ret = 4L;
if (r2 == -2 || r2 == -8) {
b[0] += 1;
ret -= 2;
}
if (r2 == -2 || r2 == -4) {
b[1] += 1;
ret -= 1;
}
return ret << (62 - p);
} else {
int k = r2 >>> 2;
long ret = 0xE000000000000000L;
int y0 = r & 1;
int y1 = (r >>> 1) & 1;
ret -= (long) y0 << 63;
ret -= (long) y1 << 62;
b[k] += y0;
b[k + 1] += y1;
b[k + 2] += 1;
return ret >>> (k + p);
}
if (r == 0) return 1L << -p;
int k = (r >>> 2) - p + 1; // is in the range [0, 64-p]
int l0 = ~r & 1;
int l1 = ~r & 2;
if (l0 == 0 && k - 2 >= 0) b[k - 2] += 1;
if (l1 == 0 && k - 1 >= 0) b[k - 1] += 1;
if (k >= 0) b[k] += 1;
return (long) (l1 | (l0 << 2) | 1) << ~k >>> p;
}

@Override
Expand Down Expand Up @@ -886,36 +855,31 @@ public double estimate(UltraLogLog ultraLogLog) {
final int m = state.length;
final int p = ultraLogLog.getP();

int c0 = 0;
int c4 = 0;
int c8 = 0;
int c10 = 0;
int[] c = new int[256];
for (byte reg : state) {
c[reg & 0xFF] += 1;
}

int c4w0 = 0;
int c4w1 = 0;
int c4w2 = 0;
int c4w3 = 0;
int off = (p << 2) + 4;

double sum = 0;
int off = (p << 2) + 4;
for (byte reg : state) {
int r = reg & 0xFF;
int r2 = r - off;
if (r2 < 0) {
if (r2 < -8) c0 += 1;
if (r2 == -8) c4 += 1;
if (r2 == -4) c8 += 1;
if (r2 == -2) c10 += 1;
} else if (r < 252) {
sum += REGISTER_CONTRIBUTIONS[r2];
} else {
if (r == 252) c4w0 += 1;
if (r == 253) c4w1 += 1;
if (r == 254) c4w2 += 1;
if (r == 255) c4w3 += 1;
for (int r = 0; r < 252 - off; ++r) {
int cTmp = c[r + off];
if (cTmp > 0) {
sum += cTmp * REGISTER_CONTRIBUTIONS[r];
}
}

int c0 = c[0];
int c4 = c[off - 8];
int c8 = c[off - 4];
int c10 = c[off - 2];

int c4w0 = c[252];
int c4w1 = c[253];
int c4w2 = c[254];
int c4w3 = c[255];

if (c0 > 0 || c4 > 0 || c8 > 0 || c10 > 0) {
double z = smallRangeEstimate(c0, c4, c8, c10, m);
if (c0 > 0) sum += calculateContribution0(c0, z);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@
import java.io.FileWriter;
import java.io.IOException;
import java.util.*;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ForkJoinPool;
import java.util.function.IntFunction;
import java.util.function.IntToDoubleFunction;
import java.util.function.ToDoubleBiFunction;
Expand Down Expand Up @@ -88,6 +90,7 @@ void doSimulation(

// parameters
int numCycles = 100000;
int maxParallelism = 32;
final BigInt largeScaleSimulationModeDistinctCountLimit = BigInt.fromLong(1000000);
List<BigInt> targetDistinctCounts = TestUtils.getDistinctCountValues(1e21, 0.05);

Expand All @@ -107,50 +110,65 @@ void doSimulation(
ThreadLocal.withInitial(
() -> new LocalState<>(supplier.apply(p), prgProvider.create(), hashGenerators, p));

IntStream.range(0, numCycles)
.parallel()
.forEach(
i -> {
LocalState<T> state = localStates.get();
final PseudoRandomGenerator prg = state.prg;
prg.reset(seeds[i]);
final T sketch = state.sketch;
sketch.reset();
MartingaleEstimator martingaleEstimator = new MartingaleEstimator();
final Transition[] transitions = state.transitions;
state.generateTransitions(largeScaleSimulationModeDistinctCountLimit);

BigInt trueDistinctCount = BigInt.createZero();
int transitionIndex = 0;
for (int distinctCountIndex = 0;
distinctCountIndex < targetDistinctCounts.size();
++distinctCountIndex) {
BigInt targetDistinctCount = targetDistinctCounts.get(distinctCountIndex);
BigInt limit = targetDistinctCount.copy();
limit.min(largeScaleSimulationModeDistinctCountLimit);

while (trueDistinctCount.compareTo(limit) < 0) {
sketch.add(prg.nextLong(), martingaleEstimator);
trueDistinctCount.increment();
}
if (trueDistinctCount.compareTo(targetDistinctCount) < 0) {
while (transitionIndex < transitions.length
&& transitions[transitionIndex]
.getDistinctCount()
.compareTo(targetDistinctCount)
<= 0) {
sketch.add(transitions[transitionIndex].getHash(), martingaleEstimator);
transitionIndex += 1;
}
trueDistinctCount.set(targetDistinctCount);
}

for (int k = 0; k < estimatorConfigs.size(); ++k) {
estimatedDistinctCounts[k][distinctCountIndex][i] =
estimatorConfigs.get(k).estimator.applyAsDouble(sketch, martingaleEstimator);
}
}
});
try {
ForkJoinPool forkJoinPool =
new ForkJoinPool(Math.min(ForkJoinPool.getCommonPoolParallelism(), maxParallelism));
forkJoinPool
.submit(
() ->
IntStream.range(0, numCycles)
.parallel()
.forEach(
i -> {
LocalState<T> state = localStates.get();
final PseudoRandomGenerator prg = state.prg;
prg.reset(seeds[i]);
final T sketch = state.sketch;
sketch.reset();
MartingaleEstimator martingaleEstimator = new MartingaleEstimator();
final Transition[] transitions = state.transitions;
state.generateTransitions(largeScaleSimulationModeDistinctCountLimit);

BigInt trueDistinctCount = BigInt.createZero();
int transitionIndex = 0;
for (int distinctCountIndex = 0;
distinctCountIndex < targetDistinctCounts.size();
++distinctCountIndex) {
BigInt targetDistinctCount =
targetDistinctCounts.get(distinctCountIndex);
BigInt limit = targetDistinctCount.copy();
limit.min(largeScaleSimulationModeDistinctCountLimit);

while (trueDistinctCount.compareTo(limit) < 0) {
sketch.add(prg.nextLong(), martingaleEstimator);
trueDistinctCount.increment();
}
if (trueDistinctCount.compareTo(targetDistinctCount) < 0) {
while (transitionIndex < transitions.length
&& transitions[transitionIndex]
.getDistinctCount()
.compareTo(targetDistinctCount)
<= 0) {
sketch.add(
transitions[transitionIndex].getHash(), martingaleEstimator);
transitionIndex += 1;
}
trueDistinctCount.set(targetDistinctCount);
}

for (int k = 0; k < estimatorConfigs.size(); ++k) {
estimatedDistinctCounts[k][distinctCountIndex][i] =
estimatorConfigs
.get(k)
.estimator
.applyAsDouble(sketch, martingaleEstimator);
}
}
}))
.get();
} catch (InterruptedException | ExecutionException e) {
throw new RuntimeException(e);
}

double[] theoreticalRelativeStandardErrors =
estimatorConfigs.stream()
Expand Down