Skip to content

Commit

Permalink
explicitly close socat socket (#12632)
Browse files Browse the repository at this point in the history
* explicitly close socat socket

* source can be closed multiple times

because AirbyteSource is an AutoCloseable
close should be idempotent though so calling it multiple times is fine
  • Loading branch information
git-phu authored and suhomud committed May 23, 2022
1 parent 50e223b commit a25233c
Show file tree
Hide file tree
Showing 3 changed files with 11 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -300,6 +300,13 @@ private static Runnable getReplicationRunnable(final AirbyteSource source,
if (recordsRead % 1000 == 0) {
LOGGER.info("Records read: {} ({})", recordsRead, FileUtils.byteCountToDisplaySize(messageTracker.getTotalBytesEmitted()));
}
} else {
LOGGER.info("Source has no more messages, closing connection.");
try {
source.close();
} catch (final Exception e) {
throw new SourceException("Source cannot be stopped!", e);
}
}
}
LOGGER.info("Total records read: {} ({})", recordsRead, FileUtils.byteCountToDisplaySize(messageTracker.getTotalBytesEmitted()));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -454,7 +454,7 @@ public KubePodProcess(final boolean isOrchestrator,
final Container relayStdout = new ContainerBuilder()
.withName("relay-stdout")
.withImage(socatImage)
.withCommand("sh", "-c", String.format("cat %s | socat -d -d - TCP:%s:%s", STDOUT_PIPE_FILE, processRunnerHost, stdoutLocalPort))
.withCommand("sh", "-c", String.format("cat %s | socat -d -d -t 60 - TCP:%s:%s", STDOUT_PIPE_FILE, processRunnerHost, stdoutLocalPort))
.withVolumeMounts(pipeVolumeMount, terminationVolumeMount)
.withResources(sidecarResources)
.withImagePullPolicy(sidecarImagePullPolicy)
Expand All @@ -463,7 +463,7 @@ public KubePodProcess(final boolean isOrchestrator,
final Container relayStderr = new ContainerBuilder()
.withName("relay-stderr")
.withImage(socatImage)
.withCommand("sh", "-c", String.format("cat %s | socat -d -d - TCP:%s:%s", STDERR_PIPE_FILE, processRunnerHost, stderrLocalPort))
.withCommand("sh", "-c", String.format("cat %s | socat -d -d -t 60 - TCP:%s:%s", STDERR_PIPE_FILE, processRunnerHost, stderrLocalPort))
.withVolumeMounts(pipeVolumeMount, terminationVolumeMount)
.withResources(sidecarResources)
.withImagePullPolicy(sidecarImagePullPolicy)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.Mockito.atLeastOnce;
import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;
Expand Down Expand Up @@ -133,7 +134,7 @@ void test() throws Exception {
verify(destination).start(destinationConfig, jobRoot);
verify(destination).accept(RECORD_MESSAGE1);
verify(destination).accept(RECORD_MESSAGE2);
verify(source).close();
verify(source, atLeastOnce()).close();
verify(destination).close();
}

Expand Down

0 comments on commit a25233c

Please sign in to comment.