Skip to content

Commit

Permalink
Merge pull request #7 from soundvibe/2.1.3
Browse files Browse the repository at this point in the history
Fixed deadlock when trying to update store while iterating.
  • Loading branch information
soundvibe authored May 12, 2020
2 parents 4e7c844 + a94eb8a commit d6e9459
Show file tree
Hide file tree
Showing 6 changed files with 95 additions and 33 deletions.
1 change: 1 addition & 0 deletions .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ dist: trusty
matrix:
include:
- jdk: openjdk13
- jdk: openjdk14

after_success:
- bash <(curl -s https://codecov.io/bash)
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
Change Log
==========
Version 2.1.3
--------------------------
- Fixed possible deadlock when trying to update store while iterating

Version 2.1.1
--------------------------
- Added `remove(key)` to `StoreWriter<K,V>` and `StoreInitializer<K,V>`
Expand Down
4 changes: 2 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -97,12 +97,12 @@ PalDB is available on Maven Central, hence just add the following dependency:
<dependency>
<groupId>net.soundvibe</groupId>
<artifactId>paldb</artifactId>
<version>2.1.2</version>
<version>2.1.3</version>
</dependency>
```
Scala SBT
```
libraryDependencies += "net.soundvibe" % "paldb" % "2.1.2"
libraryDependencies += "net.soundvibe" % "paldb" % "2.1.3"
```


Expand Down
8 changes: 4 additions & 4 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@

<groupId>net.soundvibe</groupId>
<artifactId>paldb</artifactId>
<version>2.1.2</version>
<version>2.1.3</version>
<packaging>jar</packaging>
<name>paldb</name>
<description>Embeddable persistent key-value store</description>
Expand Down Expand Up @@ -61,7 +61,7 @@
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>1.7.28</version>
<version>1.7.30</version>
</dependency>

<!--testing-->
Expand Down Expand Up @@ -121,7 +121,7 @@
<plugins>
<plugin>
<artifactId>maven-surefire-plugin</artifactId>
<version>3.0.0-M3</version>
<version>2.22.2</version>
<configuration>
<includes>
<include>**/Test*.java</include>
Expand Down Expand Up @@ -172,7 +172,7 @@
<plugin>
<groupId>org.jacoco</groupId>
<artifactId>jacoco-maven-plugin</artifactId>
<version>0.8.4</version>
<version>0.8.5</version>
<executions>
<execution>
<goals>
Expand Down
37 changes: 16 additions & 21 deletions src/main/java/com/linkedin/paldb/impl/StoreRWImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -239,7 +239,7 @@ public synchronized CompletableFuture<Map.Entry<K,V>> flushAsync() {
log.info("Compacting {}, size: {}", file, file.length());
var tempFile = FileUtils.createTempFile("tmp_", EXT_PALDB);
try (var writer = new WriterImpl<>(config, tempFile)) {
Iterable<Map.Entry<K,V>> iter = () -> new RWEntryIterator<>(reader.get(), entries, null);
Iterable<Map.Entry<K,V>> iter = () -> new RWEntryIterator<>(reader.get(), entries, rwLock);
for (var keyValue : iter) {
writer.put(keyValue.getKey(), keyValue.getValue());
}
Expand Down Expand Up @@ -303,15 +303,13 @@ public long size() {
@Override
public Stream<Map.Entry<K, V>> stream() {
var iterator = iterator();
return StreamSupport.stream(iterator.spliterator(), false)
.onClose(iterator::close);
return StreamSupport.stream(iterator.spliterator(), false);
}

@Override
public Stream<K> streamKeys() {
var iterator = keys();
return StreamSupport.stream(iterator.spliterator(), false)
.onClose(iterator::close);
return StreamSupport.stream(iterator.spliterator(), false);
}

private void invokeOnCompacted(Map.Entry<K,V> lastEntry, File storeFile) {
Expand Down Expand Up @@ -362,7 +360,7 @@ public Map.Entry<K,V> next() {

@Override
public Iterator<Map.Entry<K, V>> iterator() {
return this;
return new RWEntryIterator<>(reader, buffer, rwLock);
}
}

Expand All @@ -381,19 +379,21 @@ public K next() {

@Override
public Iterator<K> iterator() {
return this;
return new RWKeyIterator<>(reader, buffer, rwLock);
}
}

private abstract static class RWIterator<K,V> implements AutoCloseable {
private final Map<K, V> buffer;
private final ReentrantReadWriteLock rwLock;
private abstract static class RWIterator<K,V> {
final ReaderImpl<K, V> reader;
final Map<K, V> buffer;
final ReentrantReadWriteLock rwLock;
private Iterator<Map.Entry<K,V>> iterator;
Boolean checkedHasNext;
private boolean startTheSecond;
Map.Entry<K,V> nextValue;

RWIterator(ReaderImpl<K, V> reader, Map<K, V> buffer, ReentrantReadWriteLock rwLock) {
this.reader = reader;
this.buffer = buffer;
this.rwLock = rwLock;
this.iterator = reader.iterator();
Expand All @@ -405,16 +405,14 @@ public boolean hasNext() {
return checkedHasNext;
}

@Override
public void close() {
if (startTheSecond && rwLock != null) {
rwLock.readLock().unlock();
}
}

void doNext() {
if (iterator.hasNext()) {
nextValue = iterator.next();
rwLock.readLock().lock();
try {
nextValue = iterator.next();
} finally {
rwLock.readLock().unlock();
}
if (!startTheSecond) {
if (buffer.containsKey(nextValue.getKey())) {
doNext();
Expand All @@ -431,9 +429,6 @@ void doNext() {
checkedHasNext = false;
else {
startTheSecond = true;
if (rwLock != null) {
rwLock.readLock().lock();
}
iterator = buffer.entrySet().iterator();
doNext();
}
Expand Down
74 changes: 68 additions & 6 deletions src/test/java/com/linkedin/paldb/impl/StoreRWImplTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,11 @@
import java.io.*;
import java.nio.file.*;
import java.time.Duration;
import java.util.*;
import java.util.List;
import java.util.concurrent.*;
import java.util.concurrent.atomic.*;
import java.util.stream.Collectors;

import static java.util.stream.Collectors.toList;
import static org.junit.jupiter.api.Assertions.*;

@DisplayNameGeneration(DisplayNameGenerator.ReplaceUnderscores.class)
Expand Down Expand Up @@ -490,6 +490,11 @@ void should_read_and_put_using_50_threads(@TempDir Path tempDir) throws Interrup
assertEquals("any value 2", store.get(5));
assertEquals(name, store.get(id));
}

try (var stream = store.stream()) {
stream.forEach(e -> assertEquals(e.getValue(), store.get(e.getKey())));
}

} catch (Throwable error){
error.printStackTrace();
success.set(false);
Expand Down Expand Up @@ -520,8 +525,9 @@ void should_not_return_same_key_when_iterating(@TempDir Path tempDir) {
sut.put("any", "updated value");

try (var stream = sut.stream()) {
var any = stream.filter(e -> e.getKey().equals("any"))
.collect(Collectors.toList());
var any = stream
.filter(e -> e.getKey().equals("any"))
.collect(toList());

assertEquals(1, any.size());
assertEquals("any", any.get(0).getKey());
Expand All @@ -530,14 +536,70 @@ void should_not_return_same_key_when_iterating(@TempDir Path tempDir) {

sut.remove("other");
try (var stream = sut.stream()) {
var any = stream.filter(e -> e.getKey().equals("other"))
.collect(Collectors.toList());
var any = stream
.filter(e -> e.getKey().equals("other"))
.collect(toList());

assertEquals(0, any.size());
}
}
}

@Test
void should_be_able_to_update_store_while_iterating(@TempDir Path tempDir) throws InterruptedException {
var file = tempDir.resolve("testIterate.paldb");

try (var sut = new StoreRWImpl<>(PalDBConfigBuilder.<String,String>create()
.withEnableWriteAutoFlush(true)
.build(),
file.toFile())) {

try (var init = sut.init()) {
init.put("any", "value");
init.put("other", "value2");
}

sut.put("any", "updated value");

try (var stream = sut.stream()) {
var any = stream
.peek(e -> System.out.println("Iterating: " + e))
.filter(e -> e.getKey().equals("any"))
.peek(e -> {
var future = CompletableFuture.runAsync(() -> {
System.out.println("Starting update from separate thread");
sut.put("new2", "value2");
System.out.println("Removing from separate thread");
sut.remove("other");
System.out.println("Finished updating from separate thread");
});
System.out.println("Starting update");
sut.put("new", "value");
System.out.println("Removing");
sut.remove("other");
System.out.println("Finished updating");
future.join();
})
.collect(toList());

assertEquals(1, any.size());
assertEquals("any", any.get(0).getKey());
assertEquals("updated value", any.get(0).getValue());
}

sut.remove("other");
try (var stream = sut.stream()) {
var any = stream
.filter(e -> e.getKey().equals("other"))
.collect(toList());

assertEquals(0, any.size());
}

assertEquals("value2", sut.get("new2"));
}
}

@Test
@Disabled
@Tag("performance")
Expand Down

0 comments on commit d6e9459

Please sign in to comment.