Skip to content

Commit

Permalink
[FLINK-36868][state/forst] Fix the deadlock when quit forst state bac…
Browse files Browse the repository at this point in the history
…kend
  • Loading branch information
Zakelly committed Jan 22, 2025
1 parent 52d6691 commit baf5b7d
Show file tree
Hide file tree
Showing 3 changed files with 4 additions and 15 deletions.
2 changes: 1 addition & 1 deletion flink-dist/src/main/resources/META-INF/NOTICE
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ This project bundles the following dependencies under the Apache Software Licens
- com.google.code.findbugs:jsr305:1.3.9
- com.twitter:chill-java:0.7.6
- com.ververica:frocksdbjni:8.10.0-ververica-beta-1.0
- com.ververica:forstjni:0.1.5
- com.ververica:forstjni:0.1.6
- commons-cli:commons-cli:1.5.0
- commons-collections:commons-collections:3.2.2
- commons-io:commons-io:2.15.1
Expand Down
2 changes: 1 addition & 1 deletion flink-state-backends/flink-statebackend-forst/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ under the License.
<dependency>
<groupId>com.ververica</groupId>
<artifactId>forstjni</artifactId>
<version>0.1.5</version>
<version>0.1.6</version>
</dependency>

<!-- test dependencies -->
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,6 @@
import org.forstdb.FlinkEnv;
import org.forstdb.IndexType;
import org.forstdb.PlainTableConfig;
import org.forstdb.Priority;
import org.forstdb.ReadOptions;
import org.forstdb.Statistics;
import org.forstdb.TableFormatConfig;
Expand Down Expand Up @@ -85,8 +84,6 @@ public final class ForStResourceContainer implements AutoCloseable {
// and the db data dir's absolute path will be used as the log file name's prefix.
private static final int INSTANCE_PATH_LENGTH_LIMIT = 255 - FORST_RELOCATE_LOG_SUFFIX.length();

@Nullable private FlinkEnv flinkEnv = null;

@Nullable private final Path remoteBasePath;

@Nullable private final Path remoteForStPath;
Expand Down Expand Up @@ -239,11 +236,12 @@ public DBOptions getDbOptions() {
// configured,
// fallback to local directory currently temporarily.
if (remoteForStPath != null) {
flinkEnv =
FlinkEnv flinkEnv =
new FlinkEnv(
remoteBasePath.toString(),
new StringifiedForStFileSystem(forStFileSystem));
opt.setEnv(flinkEnv);
handlesToClose.add(flinkEnv);
}

return opt;
Expand Down Expand Up @@ -469,15 +467,6 @@ public void close() throws Exception {
sharedResources.close();
}
cleanRelocatedDbLogs();
if (flinkEnv != null) {
// There is something wrong with the FlinkEnv, the background threads won't quit during
// the disposal of DB. We explicit shrink the thread pool here until the ForSt repo
// fixes that.
flinkEnv.setBackgroundThreads(0, Priority.LOW);
flinkEnv.setBackgroundThreads(0, Priority.HIGH);
flinkEnv.close();
flinkEnv = null;
}
}

/**
Expand Down

0 comments on commit baf5b7d

Please sign in to comment.