Skip to content

Commit

Permalink
Add: checksum failure reported at the type level, simple repro for ob…
Browse files Browse the repository at this point in the history
…ject type corruption
  • Loading branch information
Sunjeet committed Feb 5, 2025
1 parent 30e0c8a commit abbf058
Show file tree
Hide file tree
Showing 7 changed files with 328 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -776,9 +776,11 @@ private ReadStateHelper checkIntegrity(
HollowReadStateEngine current = readStates.current().getStateEngine();

log.info("CHECKSUMS");
// System.out.println("// SNAP: TODO: current checksum");
HollowChecksum currentChecksum = HollowChecksum.forStateEngineWithCommonSchemas(current, pending);
log.info(" CUR " + currentChecksum);

// System.out.println("// SNAP: TODO: pending checksum");
HollowChecksum pendingChecksum = HollowChecksum.forStateEngineWithCommonSchemas(pending, current);
log.info(" PND " + pendingChecksum);

Expand All @@ -789,17 +791,19 @@ private ReadStateHelper checkIntegrity(

// FIXME: timt: future cycles will fail unless both deltas validate
applyDelta(artifacts.delta, current);

// System.out.println("// SNAP: TODO: applied delta, now the checksum is (should match pending)");
HollowChecksum forwardChecksum = HollowChecksum.forStateEngineWithCommonSchemas(current, pending);
//out.format(" CUR => PND %s\n", forwardChecksum);
if (!forwardChecksum.equals(pendingChecksum)) {
throw new HollowProducer.ChecksumValidationException(HollowProducer.Blob.Type.DELTA);
throw new HollowProducer.ChecksumValidationException(HollowProducer.Blob.Type.DELTA, forwardChecksum, pendingChecksum);
}

applyDelta(artifacts.reverseDelta, pending);
HollowChecksum reverseChecksum = HollowChecksum.forStateEngineWithCommonSchemas(pending, current);
//out.format(" CUR <= PND %s\n", reverseChecksum);
if (!reverseChecksum.equals(currentChecksum)) {
throw new HollowProducer.ChecksumValidationException(HollowProducer.Blob.Type.REVERSE_DELTA);
throw new HollowProducer.ChecksumValidationException(HollowProducer.Blob.Type.REVERSE_DELTA, reverseChecksum, currentChecksum);
}
if (!schemaChangedFromPriorVersion) {
// optimization - they have identical schemas, so just swap them
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import com.netflix.hollow.core.write.HollowWriteStateEngine;
import com.netflix.hollow.core.write.objectmapper.HollowObjectMapper;
import com.netflix.hollow.core.write.objectmapper.RecordPrimaryKey;
import com.netflix.hollow.tools.checksum.HollowChecksum;
import com.netflix.hollow.tools.compact.HollowCompactor;
import java.io.File;
import java.io.IOException;
Expand All @@ -42,6 +43,7 @@
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Vector;
import java.util.concurrent.Executor;

/**
Expand Down Expand Up @@ -329,8 +331,17 @@ public void removeListener(HollowProducerEventListener listener) {
public static final class ChecksumValidationException extends IllegalStateException {
private static final long serialVersionUID = -4399719849669674206L;

ChecksumValidationException(Blob.Type type) {
super(type.name() + " checksum invalid");
ChecksumValidationException(Blob.Type blobType, HollowChecksum applied, HollowChecksum pending) {
super(blobType.name() + " has invalid checksums: " + differences(applied, pending));
}

// symmetric difference between the 2 sets
static String differences(HollowChecksum applied, HollowChecksum pending) {
List<HollowChecksum.TypeChecksum> uniqueToApplied = new ArrayList<>(applied.getSortedTypeChecksums());
List<HollowChecksum.TypeChecksum> uniqueToPending = new ArrayList<>(pending.getSortedTypeChecksums());
uniqueToApplied.removeAll(pending.getSortedTypeChecksums());
uniqueToPending.removeAll(applied.getSortedTypeChecksums());
return "types unique in the applied state are " + uniqueToApplied + ", and unique in the pending state state=" + uniqueToPending;
}
}
/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,8 @@ public void reshard(HollowTypeReadState typeState, int prevNumShards, int newNum
// a join of original data elements.
}
} catch (Exception e) {
throw new RuntimeException("Error in re-sharding", e);
throw new RuntimeException(String.format("Failed to reshard %s type %s. Requires a snapshot load to recover from.",
typeState.getSchema().getSchemaType(), typeState.getSchema().getName()), e);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -252,7 +252,7 @@ public void calculateSnapshot() {
int size = VarInt.readVInt(data, readPointer);
readPointer += VarInt.sizeOfVInt(size);

int numBuckets = HashCodes.hashTableSize(size);
int numBuckets = HashCodes.hashTableSize(size);// SNAP: TODO: perhaps a unit test for 0 bucket size / empty map split/join

mapPointersAndSizesArray[shardNumber].setElementValue(((long)bitsPerMapFixedLengthPortion * shardOrdinal) + bitsPerMapPointer, bitsPerMapSizeValue, size);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import com.netflix.hollow.core.schema.HollowSchema;
import com.netflix.hollow.core.util.SimultaneousExecutor;
import java.util.Collections;
import java.util.Objects;
import java.util.Vector;

/**
Expand All @@ -32,9 +33,26 @@
public class HollowChecksum {

private int currentChecksum = 0;
private Vector<TypeChecksum> sortedTypeChecksums;

public void setSortedTypeChecksums(Vector<TypeChecksum> sortedTypeChecksums) {
this.sortedTypeChecksums = sortedTypeChecksums;
}

public Vector<TypeChecksum> getSortedTypeChecksums() {
return sortedTypeChecksums;
}

public HollowChecksum() { }

public void applyType(TypeChecksum typeChecksum) {
if (this.sortedTypeChecksums == null) {
this.sortedTypeChecksums = new Vector<>();
}
this.sortedTypeChecksums.addElement(typeChecksum);
applyInt(typeChecksum.checksum);
}

public void applyInt(int value) {
currentChecksum ^= HashCodes.hashInt(value);
currentChecksum = HashCodes.hashInt(currentChecksum);
Expand Down Expand Up @@ -95,16 +113,16 @@ public void run() {
Collections.sort(typeChecksums);

HollowChecksum totalChecksum = new HollowChecksum();

for(TypeChecksum cksum : typeChecksums) {
totalChecksum.applyInt(cksum.getChecksum());
// System.out.println(" // SNAP: TODO: remove" + cksum.type + "= " + cksum.checksum);
totalChecksum.applyType(cksum);
}

return totalChecksum;
}


private static class TypeChecksum implements Comparable<TypeChecksum>{
public static class TypeChecksum implements Comparable<TypeChecksum>{
private final String type;
private final int checksum;

Expand All @@ -121,5 +139,26 @@ public int getChecksum() {
public int compareTo(TypeChecksum other) {
return type.compareTo(other.type);
}

@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
TypeChecksum that = (TypeChecksum) o;
return checksum == that.checksum && Objects.equals(type, that.type);
}

@Override
public int hashCode() {
return Objects.hash(type, checksum);
}

@Override
public String toString() {
return "TypeChecksum{" +
"type='" + type + '\'' +
", checksum=" + checksum +
'}';
}
}
}
Loading

0 comments on commit abbf058

Please sign in to comment.