Skip to content
This repository has been archived by the owner on Jul 12, 2023. It is now read-only.

Rewatch #810

Merged
merged 2 commits into from
Nov 21, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,7 @@
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;

/**
* A {@link DockerRunner} implementation that submits container executions to a Kubernetes cluster.
Expand Down Expand Up @@ -156,6 +157,7 @@ class KubernetesDockerRunner implements DockerRunner {
private final Duration podDeletionDelay;
private final Time time;
private final ExecutorService executor;
private Watch watch;

@VisibleForTesting
KubernetesDockerRunner(String id, Fabric8KubernetesClient client, StateManager stateManager, Stats stats,
Expand Down Expand Up @@ -527,23 +529,21 @@ boolean shouldDeletePod(WorkflowInstance workflowInstance, Pod pod, String reaso

@Override
public void close() throws IOException {
closeWatch();
closer.close();
}

void init() {
scheduleWithJitter(this::cleanupPods, scheduledExecutor, cleanupPodsInterval);

final PodWatcher watcher = new PodWatcher();
final Watch watch;
try {
watch = client.watchPods(watcher);
} catch (Throwable t) {
LOG.warn("Failed to watch pods and will rely on polling.", t);
return;
}

closer.register(watch);

scheduleWithJitter(watcher::processPodUpdates, scheduledExecutor, PROCESS_POD_UPDATE_INTERVAL);
}

Expand Down Expand Up @@ -688,6 +688,8 @@ private String readStatus(Pod pod) {

public class PodWatcher implements Watcher<Pod> {

private static final int RECONNECT_DELAY_SECONDS = 1;

private final ConcurrentMap<String, WorkflowInstance> podUpdates = new ConcurrentHashMap<>();

/**
Expand Down Expand Up @@ -753,9 +755,33 @@ private void processPodUpdate(String podName, WorkflowInstance instance) {
emitPodEvents(pod, runState.get());
}

private void reconnect() {
LOG.info("Re-establishing pod watcher");

closeWatch();
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is not necessarily needed because fabric8 force closes it anyway. But to be on the safe side, we do it again and it is idempotent.


try {
watch = client.watchPods(this);
} catch (Throwable e) {
LOG.warn("Retry threw", e);
scheduleReconnect();
}
}

private void scheduleReconnect() {
scheduledExecutor.schedule(this::reconnect, RECONNECT_DELAY_SECONDS, TimeUnit.SECONDS);
}

@Override
public void onClose(KubernetesClientException e) {
LOG.warn("Watch closed", e);
scheduleReconnect();
}
}

private void closeWatch() {
if (watch != null) {
watch.close();
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -238,17 +238,32 @@ public void setUp() {
@After
public void tearDown() throws Exception {
kdr.close();
verify(podWatch, atLeastOnce()).close();
}

@Test
public void shouldFailToInitialize() {
public void shouldReconnectUponWatcherClose() {
Mockito.reset(k8sClient);
when(k8sClient.watchPods(any())).thenThrow(new KubernetesClientException("Forced failure")).thenReturn(podWatch);

podWatcher.onClose(new KubernetesClientException("Forced failure"));

executor.tick(2, TimeUnit.SECONDS);
verify(podWatch, times(2)).close();
verify(k8sClient, times(2)).watchPods(any());
}

@Test
public void shouldFailToInitialize() throws IOException {
var spiedExecutor = spy(executor);
when(k8sClient.watchPods(any())).thenThrow(new KubernetesClientException("Forced failure"));
kdr = new KubernetesDockerRunner(RUNNER_ID, k8sClient, stateManager, stats, serviceAccountSecretManager,
var kdr = new KubernetesDockerRunner(RUNNER_ID, k8sClient, stateManager, stats, serviceAccountSecretManager,
debug, STYX_ENVIRONMENT, SECRET_WHITELIST, POD_CLEANUP_INTERVAL_SECONDS, POD_DELETION_DELAY_SECONDS, time,
spiedExecutor);
kdr.init();
verify(spiedExecutor).schedule(any(Runnable.class), anyLong(), any());
kdr.close();
verify(podWatch, never()).close();
}

@Test
Expand Down