Skip to content

Commit

Permalink
[bugfix][cdc-base] Fix cdc base shutdown thread not cleared
Browse files Browse the repository at this point in the history
  • Loading branch information
ic4y committed Mar 10, 2023
1 parent add75d7 commit daa9b4f
Show file tree
Hide file tree
Showing 7 changed files with 30 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,9 @@ public interface FetchTask<Split> {
/** Returns current task is running or not. */
boolean isRunning();

/** Close this task */
void shutdown();

/** Returns the split that the task used. */
Split getSplit();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -192,6 +192,9 @@ private void checkReadException() {
@Override
public void close() {
try {
if (snapshotSplitReadTask != null) {
snapshotSplitReadTask.shutdown();
}
if (executorService != null) {
executorService.shutdown();
if (executorService.awaitTermination(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -121,9 +121,12 @@ private void checkReadException() {
@Override
public void close() {
try {
if (streamFetchTask != null) {
streamFetchTask.shutdown();
}
if (executorService != null) {
executorService.shutdown();
if (executorService.awaitTermination(
if (!executorService.awaitTermination(
READER_CLOSE_TIMEOUT_SECONDS, TimeUnit.SECONDS)) {
log.warn(
"Failed to close the stream fetcher in {} seconds.",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,11 @@ public boolean isRunning() {
return taskRunning;
}

@Override
public void shutdown() {
taskRunning = false;
}

@Override
public SourceSplitBase getSplit() {
return split;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,11 @@ public boolean isRunning() {
return taskRunning;
}

@Override
public void shutdown() {
taskRunning = false;
}

@Override
public SourceSplitBase getSplit() {
return split;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,11 @@ public boolean isRunning() {
return taskRunning;
}

@Override
public void shutdown() {
taskRunning = false;
}

@Override
public SourceSplitBase getSplit() {
return split;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,11 @@ public boolean isRunning() {
return taskRunning;
}

@Override
public void shutdown() {
taskRunning = false;
}

@Override
public SourceSplitBase getSplit() {
return split;
Expand Down

0 comments on commit daa9b4f

Please sign in to comment.