Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Revert "Compare fingerprints by hash" #1150

Merged
merged 1 commit into from
Jan 23, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -30,14 +30,11 @@ public class FingerprintDAO extends PanacheEntityBase {
@Column(columnDefinition = "jsonb")
public JsonNode fingerprint;

public Integer fp_hash;

@Override
public String toString() {
return "FP{" +
"datasetId=" + datasetId +
", fingerprint=" + fingerprint +
", fp_hash=" + fp_hash +
'}';
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -586,19 +586,18 @@ void onNewDataPoint(DataPoint.Event event) {
log.debugf("Processing new datapoint for dataset %d at %s, variable %d (%s), value %f",
dataPoint.datasetId, dataPoint.timestamp,
variable.id, variable.name, dataPoint.value);
FingerprintDAO fingerprint = FingerprintDAO.<FingerprintDAO>findByIdOptional(dataPoint.datasetId).orElse(null);
Integer fpHash = fingerprint != null ? fingerprint.fp_hash : null;
JsonNode fingerprint = FingerprintDAO.<FingerprintDAO>findByIdOptional(dataPoint.datasetId).map(fp -> fp.fingerprint).orElse(null);

VarAndFingerprint key = new VarAndFingerprint(variable.id, fpHash);
log.debugf("Invalidating variable %d FP %s timestamp %s, current value is %s", variable.id, fingerprint == null ? null : fingerprint.fingerprint, dataPoint.timestamp, validUpTo.get(key));
VarAndFingerprint key = new VarAndFingerprint(variable.id, fingerprint);
log.debugf("Invalidating variable %d FP %s timestamp %s, current value is %s", variable.id, fingerprint, dataPoint.timestamp, validUpTo.get(key));
validUpTo.compute(key, (ignored, current) -> {
if (current == null || !dataPoint.timestamp.isAfter(current.timestamp)) {
return new UpTo(dataPoint.timestamp, false);
} else {
return current;
}
});
runChangeDetection(VariableDAO.findById(variable.id), fpHash, event.notify, true);
runChangeDetection(VariableDAO.findById(variable.id), fingerprint, event.notify, true);
} else {
log.warnf("Could not process new datapoint for dataset %d at %s, could not find variable by id %d ",
dataPoint.datasetId, dataPoint.timestamp, dataPoint.variable == null ? -1 : dataPoint.variable.id);
Expand All @@ -611,24 +610,19 @@ void onNewDataPoint(DataPoint.Event event) {

@WithRoles(extras = Roles.HORREUM_SYSTEM)
@Transactional
void tryRunChangeDetection(VariableDAO variable, Integer fpHash, boolean notify) {
runChangeDetection(variable, fpHash, notify, false);
void tryRunChangeDetection(VariableDAO variable, JsonNode fingerprint, boolean notify) {
runChangeDetection(variable, fingerprint, notify, false);
}

private void runChangeDetection(VariableDAO variable, Integer fpHash, boolean notify, boolean expectExists) {
UpTo valid = validUpTo.get(new VarAndFingerprint(variable.id, fpHash));
Instant nextTimestamp = session.createNativeQuery("""
SELECT MIN(timestamp)
FROM datapoint dp
LEFT JOIN fingerprint fp ON dp.dataset_id = fp.dataset_id
WHERE dp.variable_id = ?1
AND (timestamp > ?2 OR (timestamp = ?2 AND ?3))
AND fp_hash = ?4
""", Instant.class)
private void runChangeDetection(VariableDAO variable, JsonNode fingerprint, boolean notify, boolean expectExists) {
UpTo valid = validUpTo.get(new VarAndFingerprint(variable.id, fingerprint));
Instant nextTimestamp = session.createNativeQuery(
"SELECT MIN(timestamp) FROM datapoint dp LEFT JOIN fingerprint fp ON dp.dataset_id = fp.dataset_id " +
"WHERE dp.variable_id = ?1 AND (timestamp > ?2 OR (timestamp = ?2 AND ?3)) AND json_equals(fp.fingerprint, ?4)", Instant.class)
.setParameter(1, variable.id)
.setParameter(2, valid != null ? valid.timestamp : LONG_TIME_AGO, StandardBasicTypes.INSTANT)
.setParameter(3, valid == null || !valid.inclusive)
.setParameter(4, fpHash)
.setParameter(4, fingerprint, JsonBinaryType.INSTANCE)
.getResultStream().filter(Objects::nonNull).findFirst().orElse(null);
if (nextTimestamp == null) {
log.debugf("No further datapoints for change detection");
Expand All @@ -637,39 +631,27 @@ SELECT MIN(timestamp)

// this should happen only after reboot, let's start with last change
if (valid != null) {
int numDeleted = session.createNativeQuery("""
DELETE
FROM change cc
WHERE cc.id IN (
SELECT id
FROM change c
LEFT JOIN fingerprint fp ON c.dataset_id = fp.dataset_id
WHERE NOT c.confirmed
AND c.variable_id = ?1
AND (c.timestamp > ?2 OR (c.timestamp = ?2 AND ?3))
AND fp.fp_hash = ?4)
""", int.class)
int numDeleted = session.createNativeQuery("DELETE FROM change cc WHERE cc.id IN (" +
"SELECT id FROM change c LEFT JOIN fingerprint fp ON c.dataset_id = fp.dataset_id " +
"WHERE NOT c.confirmed AND c.variable_id = ?1 AND (c.timestamp > ?2 OR (c.timestamp = ?2 AND ?3)) " +
"AND json_equals(fp.fingerprint, ?4))", int.class)
.setParameter(1, variable.id)
.setParameter(2, valid.timestamp, StandardBasicTypes.INSTANT)
.setParameter(3, !valid.inclusive)
.setParameter(4, fpHash)
.setParameter(4, fingerprint, JsonBinaryType.INSTANCE)
.executeUpdate();
log.debugf("Deleted %d changes %s %s for variable %d, fingerprint %s", numDeleted, valid.inclusive ? ">" : ">=", valid.timestamp, variable.id, fpHash);
log.debugf("Deleted %d changes %s %s for variable %d, fingerprint %s", numDeleted, valid.inclusive ? ">" : ">=", valid.timestamp, variable.id, fingerprint);
}

var changeQuery = session.createQuery("""
SELECT c FROM Change c
LEFT JOIN Fingerprint fp ON c.dataset.id = fp.dataset.id
WHERE c.variable = ?1
AND (c.timestamp < ?2 OR (c.timestamp = ?2 AND ?3 = TRUE))
AND fp.fp_hash = ?4
ORDER by c.timestamp DESC
""", ChangeDAO.class);
var changeQuery = session.createQuery("SELECT c FROM Change c LEFT JOIN Fingerprint fp ON c.dataset.id = fp.dataset.id " +
"WHERE c.variable = ?1 AND (c.timestamp < ?2 OR (c.timestamp = ?2 AND ?3 = TRUE)) AND " +
"TRUE = function('json_equals', fp.fingerprint, ?4) " +
"ORDER by c.timestamp DESC", ChangeDAO.class);
changeQuery
.setParameter(1, variable)
.setParameter(2, valid != null ? valid.timestamp : VERY_DISTANT_FUTURE)
.setParameter(3, valid == null || valid.inclusive)
.setParameter(4, fpHash);
.setParameter(4, fingerprint, JsonBinaryType.INSTANCE);
ChangeDAO lastChange = changeQuery.setMaxResults(1).getResultStream().findFirst().orElse(null);

Instant changeTimestamp = LONG_TIME_AGO;
Expand All @@ -682,12 +664,12 @@ WHERE cc.id IN (
"SELECT dp FROM DataPoint dp LEFT JOIN Fingerprint fp ON dp.dataset.id = fp.dataset.id " +
"JOIN dp.dataset " + // ignore datapoints (that were not deleted yet) from deleted datasets
"WHERE dp.variable = ?1 AND dp.timestamp BETWEEN ?2 AND ?3 " +
"AND fp.fp_hash = ?4 " +
"AND TRUE = function('json_equals', fp.fingerprint, ?4) " +
"ORDER BY dp.timestamp DESC, dp.dataset.id DESC", DataPointDAO.class)
.setParameter(1, variable)
.setParameter(2, changeTimestamp)
.setParameter(3, nextTimestamp)
.setParameter(4, fpHash)
.setParameter(4, fingerprint, JsonBinaryType.INSTANCE)
.getResultList();
// Last datapoint is already in the list
if (dataPoints.isEmpty()) {
Expand Down Expand Up @@ -727,13 +709,13 @@ WHERE cc.id IN (
}
}
Util.doAfterCommit(tm, () -> {
validateUpTo(variable, fpHash, nextTimestamp);
messageBus.executeForTest(variable.testId, () -> tryRunChangeDetection(variable, fpHash, notify));
validateUpTo(variable, fingerprint, nextTimestamp);
messageBus.executeForTest(variable.testId, () -> tryRunChangeDetection(variable, fingerprint, notify));
});
}

private void validateUpTo(VariableDAO variable, Integer fpHash, Instant timestamp) {
validUpTo.compute(new VarAndFingerprint(variable.id, fpHash), (ignored, current) -> {
private void validateUpTo(VariableDAO variable, JsonNode fingerprint, Instant timestamp) {
validUpTo.compute(new VarAndFingerprint(variable.id, fingerprint), (ignored, current) -> {
log.debugf("Attempt %s, valid up to %s, ", timestamp, current);
if (current == null || !current.timestamp.isAfter(timestamp)) {
return new UpTo(timestamp, true);
Expand Down Expand Up @@ -1351,25 +1333,24 @@ public static class Recalculation {

static final class VarAndFingerprint {
final int varId;
final Integer fp_hash;
final JsonNode fingerprint;

VarAndFingerprint(int varId, Integer fp_hash) {
VarAndFingerprint(int varId, JsonNode fingerprint) {
this.varId = varId;
this.fp_hash = fp_hash;
this.fingerprint = fingerprint;
}


@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
VarAndFingerprint that = (VarAndFingerprint) o;
return varId == that.varId && Objects.equals(fp_hash, that.fp_hash);
return varId == that.varId && Objects.equals(fingerprint, that.fingerprint);
}

@Override
public int hashCode() {
return Objects.hash(varId, fp_hash);
return Objects.hash(varId, fingerprint);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -559,7 +559,6 @@ private void createFingerprint(int datasetId, int testId) {
fp.datasetId = datasetId;
fp.dataset = DatasetDAO.findById(datasetId);
fp.fingerprint = fpNode;
fp.fp_hash = fpNode.hashCode();
if(fp.datasetId > 0 && fp.dataset != null)
fp.persist();
}
Expand Down