Skip to content

Commit

Permalink
Merge pull request #5 from soundvibe/remove-in-initializer
Browse files Browse the repository at this point in the history
Entries can now be removed from writer and initializer.
  • Loading branch information
soundvibe authored Dec 3, 2019
2 parents 4b1eb0e + aaabc6e commit 876b09a
Show file tree
Hide file tree
Showing 15 changed files with 268 additions and 46 deletions.
4 changes: 2 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -97,12 +97,12 @@ PalDB is available on Maven Central, hence just add the following dependency:
<dependency>
<groupId>net.soundvibe</groupId>
<artifactId>paldb</artifactId>
<version>2.1.0</version>
<version>2.1.1</version>
</dependency>
```
Scala SBT
```
libraryDependencies += "net.soundvibe" % "paldb" % "2.1.0"
libraryDependencies += "net.soundvibe" % "paldb" % "2.1.1"
```


Expand Down
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@

<groupId>net.soundvibe</groupId>
<artifactId>paldb</artifactId>
<version>2.1.0</version>
<version>2.1.1</version>
<packaging>jar</packaging>
<name>paldb</name>
<description>Embeddable persistent key-value store</description>
Expand Down
2 changes: 2 additions & 0 deletions src/main/java/com/linkedin/paldb/api/StoreInitializer.java
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ public interface StoreInitializer<K,V> extends AutoCloseable {

void put(K key, V value);

void remove(K key);

@Override
void close();
}
6 changes: 6 additions & 0 deletions src/main/java/com/linkedin/paldb/api/StoreWriter.java
Original file line number Diff line number Diff line change
Expand Up @@ -81,4 +81,10 @@ default void putAll(K[] keys, V[] values) {
* null
*/
void put(byte[] key, byte[] value);

/**
* Removes key from the store
* @param key Key to be removed
*/
void remove(K key);
}
4 changes: 3 additions & 1 deletion src/main/java/com/linkedin/paldb/impl/ReaderImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

import com.linkedin.paldb.api.*;
import com.linkedin.paldb.api.errors.StoreClosed;
import com.linkedin.paldb.impl.StorageSerialization.RemovedValue;
import com.linkedin.paldb.utils.DataInputOutput;
import org.slf4j.*;

Expand Down Expand Up @@ -100,7 +101,8 @@ public V get(K key, V defaultValue) {
try {
byte[] valueBytes = storage.get(serialization.serializeKey(key));
if (valueBytes != null) {
return serialization.deserializeValue(new DataInputOutput(valueBytes));
var value = serialization.deserializeValue(new DataInputOutput(valueBytes));
return value == RemovedValue.INSTANCE ? null : value;
} else {
return defaultValue;
}
Expand Down
19 changes: 12 additions & 7 deletions src/main/java/com/linkedin/paldb/impl/StorageReader.java
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import java.time.Instant;
import java.util.*;

import static com.linkedin.paldb.impl.StorageSerialization.REMOVED_ID;
import static com.linkedin.paldb.utils.DataInputOutput.*;


Expand All @@ -43,6 +44,7 @@ public class StorageReader implements Iterable<Map.Entry<byte[], byte[]>> {
private final long keyCount;
// Key count for each key length
private final long[] keyCounts;
private final long[] actualKeyCounts;
// Slot size for each key length
private final int[] slotSizes;
// Number of slots for each key length
Expand Down Expand Up @@ -151,6 +153,7 @@ public class StorageReader implements Iterable<Map.Entry<byte[], byte[]>> {
indexOffsets = new long[maxKeyLength + 1];
dataOffsets = new long[maxKeyLength + 1];
keyCounts = new long[maxKeyLength + 1];
actualKeyCounts = new long[maxKeyLength + 1];
slots = new long[maxKeyLength + 1];
slotSizes = new int[maxKeyLength + 1];

Expand All @@ -159,6 +162,7 @@ public class StorageReader implements Iterable<Map.Entry<byte[], byte[]>> {
int keyLength = dataInputStream.readInt();

keyCounts[keyLength] = dataInputStream.readLong();
actualKeyCounts[keyLength] = dataInputStream.readLong();
slots[keyLength] = dataInputStream.readLong();
slotSizes[keyLength] = dataInputStream.readInt();
indexOffsets[keyLength] = dataInputStream.readLong();
Expand Down Expand Up @@ -407,8 +411,8 @@ public StorageIterator(boolean value) {
}

private void nextKeyLength() {
for (int i = currentKeyLength + 1; i < keyCounts.length; i++) {
long c = keyCounts[i];
for (int i = currentKeyLength + 1; i < actualKeyCounts.length; i++) {
long c = actualKeyCounts[i];
if (c > 0) {
currentKeyLength = i;
keyLimit += c;
Expand Down Expand Up @@ -436,14 +440,15 @@ public FastEntry next() {
}

byte[] key = Arrays.copyOf(currentSlotBuffer, currentKeyLength);
byte[] value = null;
long valueOffset = currentDataOffset + offset;
var value = mMapData ? getMMapBytes(valueOffset) : getDiskBytes(valueOffset);

if (withValue) {
long valueOffset = currentDataOffset + offset;
value = mMapData ? getMMapBytes(valueOffset) : getDiskBytes(valueOffset);
boolean isRemoved = (value.length > 0 && (((int) value[0] & 0xff) == REMOVED_ID));
if (isRemoved) {
return next();
}

entry.set(key, value);
entry.set(key, withValue ? value : null);

if (++keyIndex == keyLimit) {
nextKeyLength();
Expand Down
21 changes: 15 additions & 6 deletions src/main/java/com/linkedin/paldb/impl/StorageSerialization.java
Original file line number Diff line number Diff line change
Expand Up @@ -118,8 +118,14 @@ boolean isCompressionEnabled() {

// SERIALIZATION

private static final int NULL_ID = -1;
public static class RemovedValue {

public static final RemovedValue INSTANCE = new RemovedValue();

}

private static final int NULL = 0;
public static final int REMOVED_ID = 1;
private static final int BOOLEAN_TRUE = 2;
private static final int BOOLEAN_FALSE = 3;
private static final int INTEGER_MINUS_1 = 4;
Expand Down Expand Up @@ -217,6 +223,8 @@ private <T> void serialize(final DataOutput out, final Object obj, boolean compr

if (obj == null) {
out.write(NULL);
} else if (obj == RemovedValue.INSTANCE) {
out.write(REMOVED_ID);
} else if (serializer != null) {
out.write(compress ? CUSTOM_C : CUSTOM);
var bytes = serializer.write((T) obj);
Expand Down Expand Up @@ -694,7 +702,7 @@ private <T> T deserialize(byte[] buf, Serializer<T> serializer) throws IOExcepti
DataInputOutput bs = new DataInputOutput(buf);
Object ret = deserialize(bs, serializer);
if (bs.available() != 0) {
throw new RuntimeException("bytes left: " + bs.available());
throw new IOException("bytes left: " + bs.available());
}

return (T) ret;
Expand Down Expand Up @@ -731,6 +739,9 @@ private <T> T deserialize(DataInput is, Serializer<T> serializer) throws IOExcep
break;
case NULL:
break;
case REMOVED_ID:
ret = RemovedValue.INSTANCE;
break;
case BOOLEAN_TRUE:
ret = Boolean.TRUE;
break;
Expand Down Expand Up @@ -980,9 +991,7 @@ private <T> T deserialize(DataInput is, Serializer<T> serializer) throws IOExcep
case ARRAY_OBJECT:
ret = deserializeArrayObject(is, serializer);
break;
case -1:
throw new EOFException();

default: throw new EOFException();
}
return (T) ret;
}
Expand Down Expand Up @@ -1101,7 +1110,7 @@ private static String[] deserializeStringArray(DataInput is) throws IOException
return ret;
}

private static final byte[] EMPTY_BYTES = new byte[0];
public static final byte[] EMPTY_BYTES = new byte[0];

private static byte[] deserializeByteArray(DataInput is) throws IOException {
int size = LongPacker.unpackInt(is);
Expand Down
54 changes: 40 additions & 14 deletions src/main/java/com/linkedin/paldb/impl/StorageWriter.java
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import java.text.DecimalFormat;
import java.util.*;

import static com.linkedin.paldb.impl.StorageSerialization.*;
import static com.linkedin.paldb.utils.DataInputOutput.*;

/**
Expand Down Expand Up @@ -58,6 +59,7 @@ public class StorageWriter {
// Number of keys
private long keyCount;
private long[] keyCounts;
private long[] actualKeyCounts;
// Number of values
private long valueCount;
// Number of collisions
Expand Down Expand Up @@ -87,9 +89,12 @@ public class StorageWriter {
dataLengths = new long[0];
maxOffsetLengths = new int[0];
keyCounts = new long[0];
actualKeyCounts = new long[0];
segmentSize = config.getLong(Configuration.MMAP_SEGMENT_SIZE);
duplicatesEnabled = config.getBoolean(Configuration.ALLOW_DUPLICATES);
}
}

private static final byte[] REMOVED_BYTES = new byte[] {REMOVED_ID};

public void put(byte[] key, byte[] value) throws IOException {
int keyLength = key.length;
Expand All @@ -113,28 +118,37 @@ public void put(byte[] key, byte[] value) throws IOException {
// Write offset and record max offset length
int offsetDataLength = LongPacker.packLong(indexStream, dataLength);
maxOffsetLengths[keyLength] = Math.max(offsetDataLength, maxOffsetLengths[keyLength]);
byte[] val;
if (value == null) {
val = REMOVED_BYTES;
LongPacker.packInt(indexStream, REMOVED_ID);
} else {
val = value;
LongPacker.packInt(indexStream, 0);
}

// Write if data is not the same
if (!sameValue) {
// Get stream
DataOutputStream dataStream = getDataStream(keyLength);

// Write size and value
int valueSize = LongPacker.packInt(dataStream, value.length);
dataStream.write(value);
int valueSize = LongPacker.packInt(dataStream, val.length);
dataStream.write(val);

// Update data length
dataLengths[keyLength] += valueSize + value.length;
dataLengths[keyLength] += valueSize + val.length;

// Update last value
lastValues[keyLength] = value;
lastValuesLength[keyLength] = valueSize + value.length;
lastValues[keyLength] = val;
lastValuesLength[keyLength] = valueSize + val.length;

valueCount++;
}

keyCount++;
keyCounts[keyLength]++;
actualKeyCounts[keyLength]++;
}

public void close() throws IOException {
Expand Down Expand Up @@ -241,6 +255,7 @@ private void writeMetadata(RandomAccessFile dataOutputStream, BloomFilter bloomF

// Write key count
dataOutputStream.writeLong(keyCounts[i]);
dataOutputStream.writeLong(actualKeyCounts[i]);

// Write slot count
long slots = Math.round(keyCounts[i] / loadFactor);
Expand Down Expand Up @@ -289,15 +304,15 @@ private MappedByteBuffer[] initIndexBuffers(FileChannel channel, long indexSize)

private File buildIndex(int keyLength, BloomFilter bloomFilter) throws IOException {
long count = keyCounts[keyLength];
long duplicateCount = 0L;
long numSlots = Math.round(count / loadFactor);
int offsetLength = maxOffsetLengths[keyLength];
int slotSize = keyLength + offsetLength;

// Init index
var indexFile = new File(tempFolder, "index" + keyLength + ".dat");
try (RandomAccessFile indexAccessFile = new RandomAccessFile(indexFile, "rw")) {
try (var indexAccessFile = new RandomAccessFile(indexFile, "rw")) {
indexAccessFile.setLength(numSlots * slotSize);
try (FileChannel indexChannel = indexAccessFile.getChannel()) {
try (var indexChannel = indexAccessFile.getChannel()) {
var indexBuffers = initIndexBuffers(indexChannel, indexAccessFile.length());
// Init reading stream
File tempIndexFile = indexFiles[keyLength];
Expand All @@ -313,19 +328,24 @@ private File buildIndex(int keyLength, BloomFilter bloomFilter) throws IOExcepti

// Read offset
long offsetData = LongPacker.unpackLong(tempIndexStream);

int head = LongPacker.unpackInt(tempIndexStream);
boolean isRemoved = head == REMOVED_ID;
// Hash
long hash = Murmur3.hash(keyBuffer);
if (bloomFilter != null) {
bloomFilter.add(keyBuffer);
}

boolean collision = false;
for (long probe = 0; probe < count; probe++) {
long slot = ((hash + probe) % numSlots);
getFromBuffers(indexBuffers, slot * slotSize, slotBuffer, slotSize, segmentSize);
long found = LongPacker.unpackLong(slotBuffer, keyLength);
if (found == 0) {
if (found == 0L) {

if (isRemoved) {
duplicateCount++;
break;
}
// The spot is empty use it
putIntoBuffers(indexBuffers, slot * slotSize, keyBuffer, keyBuffer.length, segmentSize);
int pos = LongPacker.packLong(offsetBuffer, offsetData);
Expand All @@ -335,10 +355,13 @@ private File buildIndex(int keyLength, BloomFilter bloomFilter) throws IOExcepti
collision = true;
// Check for duplicates
if (isKey(slotBuffer, keyBuffer)) {
if (duplicatesEnabled) {
if (isRemoved || duplicatesEnabled) {
putIntoBuffers(indexBuffers, slot * slotSize, keyBuffer, keyBuffer.length, segmentSize);
int pos = LongPacker.packLong(offsetBuffer, offsetData);
putIntoBuffers(indexBuffers, (slot * slotSize) + keyBuffer.length, offsetBuffer, pos, segmentSize);
duplicateCount++;
if (isRemoved) duplicateCount++;
break;
} else {
throw new DuplicateKeyException(
String.format("A duplicate key has been found for key bytes %s", Arrays.toString(keyBuffer)));
Expand All @@ -359,7 +382,7 @@ private File buildIndex(int keyLength, BloomFilter bloomFilter) throws IOExcepti

} finally {
unmap(indexBuffers);
if (tempIndexFile.delete()) {
if (tempIndexFile.delete()) {
log.info("Temporary index file {} has been deleted", tempIndexFile.getName());
}
}
Expand All @@ -368,6 +391,8 @@ private File buildIndex(int keyLength, BloomFilter bloomFilter) throws IOExcepti
Files.deleteIfExists(indexFile.toPath());
throw e;
}
keyCount -= duplicateCount;
actualKeyCounts[keyLength] -= duplicateCount;
return indexFile;
}

Expand Down Expand Up @@ -463,6 +488,7 @@ private DataOutputStream getIndexStream(int keyLength) throws IOException {
indexStreams = copyOfIndexStreams;
indexFiles = Arrays.copyOf(indexFiles, keyLength + 1);
keyCounts = Arrays.copyOf(keyCounts, keyLength + 1);
actualKeyCounts = Arrays.copyOf(actualKeyCounts, keyLength + 1);
maxOffsetLengths = Arrays.copyOf(maxOffsetLengths, keyLength + 1);
lastValues = Arrays.copyOf(lastValues, keyLength + 1);
lastValuesLength = Arrays.copyOf(lastValuesLength, keyLength + 1);
Expand Down
11 changes: 2 additions & 9 deletions src/main/java/com/linkedin/paldb/impl/StoreImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -61,14 +61,7 @@ public static <K,V> StoreWriter<K,V> createWriter(File file, Configuration<K,V>
}
try {
log.info("Initialize writer from file {}", file.getName());
File parent = file.getParentFile();
if (parent != null && !parent.exists()) {
if (parent.mkdirs()) {
log.info("Creating directories for path {}", file.getName());
} else {
throw new IOException(String.format("Couldn't create directory %s", parent));
}
}
Files.createDirectories(file.isDirectory() ? file.toPath() : file.toPath().toAbsolutePath().getParent());
return new WriterImpl<>(config, file);
} catch (IOException ex) {
throw new UncheckedIOException(ex);
Expand All @@ -89,7 +82,7 @@ public static <V, K> StoreRW<K, V> createRW(File file, Configuration<K,V> config
}
log.info("Initialize RW from file {}", file.getName());
try {
Files.createDirectories(file.isDirectory() ? file.toPath() : file.toPath().getParent());
Files.createDirectories(file.isDirectory() ? file.toPath() : file.toPath().toAbsolutePath().getParent());
return new StoreRWImpl<>(config, file);
} catch (IOException e) {
throw new UncheckedIOException(e);
Expand Down
Loading

0 comments on commit 876b09a

Please sign in to comment.