From b20de6924e23b09c1fb6809359cb685181bbf1cd Mon Sep 17 00:00:00 2001 From: Karan Kumar Date: Fri, 21 Feb 2025 14:00:33 +0530 Subject: [PATCH] Revert "Fix error where communication failures to k8s can lead to stuck tasks (#17431)" (#17747) This reverts commit 8850023811b27eeb25095fa9d4ce96ede9081b52. --- .../k8s/overlord/KubernetesPeonLifecycle.java | 18 ++++++----- .../overlord/common/KubernetesPeonClient.java | 3 +- .../overlord/KubernetesPeonLifecycleTest.java | 31 ++++++++++++------- .../common/KubernetesPeonClientTest.java | 3 +- 4 files changed, 32 insertions(+), 23 deletions(-) diff --git a/extensions-core/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesPeonLifecycle.java b/extensions-core/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesPeonLifecycle.java index e20e10ec3411..f9eb89ff1dce 100644 --- a/extensions-core/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesPeonLifecycle.java +++ b/extensions-core/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesPeonLifecycle.java @@ -42,7 +42,6 @@ import org.apache.druid.tasklogs.TaskLogs; import org.checkerframework.checker.nullness.qual.MonotonicNonNull; -import java.io.ByteArrayInputStream; import java.io.IOException; import java.io.InputStream; import java.nio.charset.Charset; @@ -351,20 +350,23 @@ protected void startWatchingLogs() protected void saveLogs() { try { - final Path file = Files.createTempFile(taskId.getOriginalTaskId(), "log"); + Path file = Files.createTempFile(taskId.getOriginalTaskId(), "log"); try { - final InputStream logStream; + startWatchingLogs(); if (logWatch != null) { - logStream = logWatch.getOutput(); + FileUtils.copyInputStreamToFile(logWatch.getOutput(), file.toFile()); } else { - logStream = kubernetesClient.getPeonLogs(taskId).or( - new ByteArrayInputStream(StringUtils.format( + log.debug("Log stream not found for %s", taskId.getOriginalTaskId()); + FileUtils.writeStringToFile( + file.toFile(), + StringUtils.format( "Peon for task [%s] did not report any logs. Check k8s metrics and events for the pod to see what happened.", taskId - ).getBytes(StandardCharsets.UTF_8)) + ), + Charset.defaultCharset() ); + } - FileUtils.copyInputStreamToFile(logStream, file.toFile()); taskLogs.pushTaskLog(taskId.getOriginalTaskId(), file.toFile()); } catch (IOException e) { diff --git a/extensions-core/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/common/KubernetesPeonClient.java b/extensions-core/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/common/KubernetesPeonClient.java index 00cf93ba992d..63487e4e373e 100644 --- a/extensions-core/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/common/KubernetesPeonClient.java +++ b/extensions-core/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/common/KubernetesPeonClient.java @@ -25,7 +25,6 @@ import io.fabric8.kubernetes.api.model.batch.v1.Job; import io.fabric8.kubernetes.client.KubernetesClient; import io.fabric8.kubernetes.client.dsl.LogWatch; -import org.apache.druid.error.DruidException; import org.apache.druid.indexing.common.task.IndexTaskUtils; import org.apache.druid.indexing.common.task.Task; import org.apache.druid.java.util.common.RetryUtils; @@ -267,7 +266,7 @@ Pod getPeonPodWithRetries(KubernetesClient client, String jobName, int quietTrie ); } catch (Exception e) { - throw DruidException.defensive(e, "Error when looking for K8s pod with label: job-name=%s", jobName); + throw new KubernetesResourceNotFoundException("K8s pod with label: job-name=" + jobName + " not found"); } } diff --git a/extensions-core/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/KubernetesPeonLifecycleTest.java b/extensions-core/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/KubernetesPeonLifecycleTest.java index 85ea7072b857..e12e6c7ab72b 100644 --- a/extensions-core/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/KubernetesPeonLifecycleTest.java +++ b/extensions-core/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/KubernetesPeonLifecycleTest.java @@ -46,10 +46,8 @@ import org.junit.Test; import org.junit.runner.RunWith; -import java.io.ByteArrayInputStream; import java.io.File; import java.io.IOException; -import java.io.InputStream; import java.lang.reflect.Field; import java.nio.charset.StandardCharsets; import java.util.concurrent.ExecutionException; @@ -60,9 +58,7 @@ public class KubernetesPeonLifecycleTest extends EasyMockSupport { private static final String ID = "id"; - private static final String IP = "ip"; private static final TaskStatus SUCCESS = TaskStatus.success(ID); - private static final InputStream LOG_INPUT_STREAM = new ByteArrayInputStream("logs for task".getBytes(StandardCharsets.UTF_8)); @Mock KubernetesPeonClient kubernetesClient; @Mock TaskLogs taskLogs; @@ -328,7 +324,7 @@ public void test_join_withoutJob_returnsFailedTaskStatus() throws IOException EasyMock.anyLong(), EasyMock.eq(TimeUnit.MILLISECONDS) )).andReturn(new JobResponse(null, PeonPhase.FAILED)); - EasyMock.expect(kubernetesClient.getPeonLogs(k8sTaskId)).andReturn(Optional.absent()); + EasyMock.expect(kubernetesClient.getPeonLogWatcher(k8sTaskId)).andReturn(Optional.absent()); EasyMock.expect(taskLogs.streamTaskStatus(ID)).andReturn(Optional.absent()); taskLogs.pushTaskLog(EasyMock.eq(ID), EasyMock.anyObject(File.class)); @@ -378,7 +374,7 @@ public void test_join() throws IOException EasyMock.anyLong(), EasyMock.eq(TimeUnit.MILLISECONDS) )).andReturn(new JobResponse(job, PeonPhase.SUCCEEDED)); - EasyMock.expect(kubernetesClient.getPeonLogs(k8sTaskId)).andReturn(Optional.of(LOG_INPUT_STREAM)); + EasyMock.expect(kubernetesClient.getPeonLogWatcher(k8sTaskId)).andReturn(Optional.of(logWatch)); EasyMock.expect(taskLogs.streamTaskStatus(ID)).andReturn(Optional.of( IOUtils.toInputStream(mapper.writeValueAsString(SUCCESS), StandardCharsets.UTF_8) )); @@ -387,6 +383,8 @@ public void test_join() throws IOException stateListener.stateChanged(KubernetesPeonLifecycle.State.RUNNING, ID); EasyMock.expectLastCall().once(); stateListener.stateChanged(KubernetesPeonLifecycle.State.STOPPED, ID); + EasyMock.expectLastCall().once(); + logWatch.close(); EasyMock.expectLastCall(); Assert.assertEquals(KubernetesPeonLifecycle.State.NOT_STARTED, peonLifecycle.getState()); @@ -426,7 +424,7 @@ public void test_join_whenCalledMultipleTimes_raisesIllegalStateException() thro EasyMock.anyLong(), EasyMock.eq(TimeUnit.MILLISECONDS) )).andReturn(new JobResponse(job, PeonPhase.SUCCEEDED)); - EasyMock.expect(kubernetesClient.getPeonLogs(k8sTaskId)).andReturn(Optional.of(LOG_INPUT_STREAM)).times(2); + EasyMock.expect(kubernetesClient.getPeonLogWatcher(k8sTaskId)).andReturn(Optional.of(logWatch)); EasyMock.expect(taskLogs.streamTaskStatus(ID)).andReturn( Optional.of(IOUtils.toInputStream(mapper.writeValueAsString(SUCCESS), StandardCharsets.UTF_8)) ); @@ -434,10 +432,14 @@ public void test_join_whenCalledMultipleTimes_raisesIllegalStateException() thro EasyMock.expectLastCall(); taskLogs.pushTaskLog(EasyMock.eq(ID), EasyMock.anyObject(File.class)); EasyMock.expectLastCall(); + logWatch.close(); + EasyMock.expectLastCall(); stateListener.stateChanged(KubernetesPeonLifecycle.State.RUNNING, ID); EasyMock.expectLastCall().once(); stateListener.stateChanged(KubernetesPeonLifecycle.State.STOPPED, ID); EasyMock.expectLastCall().once(); + logWatch.close(); + EasyMock.expectLastCall(); Assert.assertEquals(KubernetesPeonLifecycle.State.NOT_STARTED, peonLifecycle.getState()); @@ -482,7 +484,7 @@ public void test_join_withoutTaskStatus_returnsFailedTaskStatus() throws IOExcep EasyMock.anyLong(), EasyMock.eq(TimeUnit.MILLISECONDS) )).andReturn(new JobResponse(job, PeonPhase.SUCCEEDED)); - EasyMock.expect(kubernetesClient.getPeonLogs(k8sTaskId)).andReturn(Optional.absent()); + EasyMock.expect(kubernetesClient.getPeonLogWatcher(k8sTaskId)).andReturn(Optional.absent()); EasyMock.expect(taskLogs.streamTaskStatus(ID)).andReturn(Optional.absent()); taskLogs.pushTaskLog(EasyMock.eq(ID), EasyMock.anyObject(File.class)); EasyMock.expectLastCall(); @@ -530,7 +532,7 @@ public void test_join_whenIOExceptionThrownWhileStreamingTaskStatus_returnsFaile EasyMock.anyLong(), EasyMock.eq(TimeUnit.MILLISECONDS) )).andReturn(new JobResponse(job, PeonPhase.SUCCEEDED)); - EasyMock.expect(kubernetesClient.getPeonLogs(k8sTaskId)).andReturn(Optional.of(LOG_INPUT_STREAM)); + EasyMock.expect(kubernetesClient.getPeonLogWatcher(k8sTaskId)).andReturn(Optional.of(logWatch)); EasyMock.expect(taskLogs.streamTaskStatus(ID)).andThrow(new IOException()); taskLogs.pushTaskLog(EasyMock.eq(ID), EasyMock.anyObject(File.class)); EasyMock.expectLastCall(); @@ -538,6 +540,9 @@ public void test_join_whenIOExceptionThrownWhileStreamingTaskStatus_returnsFaile EasyMock.expectLastCall().once(); stateListener.stateChanged(KubernetesPeonLifecycle.State.STOPPED, ID); EasyMock.expectLastCall().once(); + logWatch.close(); + EasyMock.expectLastCall(); + Assert.assertEquals(KubernetesPeonLifecycle.State.NOT_STARTED, peonLifecycle.getState()); replayAll(); @@ -577,7 +582,7 @@ public void test_join_whenIOExceptionThrownWhileStreamingTaskLogs_isIgnored() th EasyMock.anyLong(), EasyMock.eq(TimeUnit.MILLISECONDS) )).andReturn(new JobResponse(job, PeonPhase.SUCCEEDED)); - EasyMock.expect(kubernetesClient.getPeonLogs(k8sTaskId)).andReturn(Optional.of(LOG_INPUT_STREAM)); + EasyMock.expect(kubernetesClient.getPeonLogWatcher(k8sTaskId)).andReturn(Optional.of(logWatch)); EasyMock.expect(taskLogs.streamTaskStatus(ID)).andReturn( Optional.of(IOUtils.toInputStream(mapper.writeValueAsString(SUCCESS), StandardCharsets.UTF_8)) ); @@ -587,6 +592,8 @@ public void test_join_whenIOExceptionThrownWhileStreamingTaskLogs_isIgnored() th EasyMock.expectLastCall().once(); stateListener.stateChanged(KubernetesPeonLifecycle.State.STOPPED, ID); EasyMock.expectLastCall().once(); + logWatch.close(); + EasyMock.expectLastCall(); Assert.assertEquals(KubernetesPeonLifecycle.State.NOT_STARTED, peonLifecycle.getState()); @@ -618,13 +625,15 @@ public void test_join_whenRuntimeExceptionThrownWhileWaitingForKubernetesJob_thr )).andThrow(new RuntimeException()); // We should still try to push logs - EasyMock.expect(kubernetesClient.getPeonLogs(k8sTaskId)).andReturn(Optional.of(LOG_INPUT_STREAM)); + EasyMock.expect(kubernetesClient.getPeonLogWatcher(k8sTaskId)).andReturn(Optional.of(logWatch)); taskLogs.pushTaskLog(EasyMock.eq(ID), EasyMock.anyObject(File.class)); EasyMock.expectLastCall(); stateListener.stateChanged(KubernetesPeonLifecycle.State.RUNNING, ID); EasyMock.expectLastCall().once(); stateListener.stateChanged(KubernetesPeonLifecycle.State.STOPPED, ID); EasyMock.expectLastCall().once(); + logWatch.close(); + EasyMock.expectLastCall(); Assert.assertEquals(KubernetesPeonLifecycle.State.NOT_STARTED, peonLifecycle.getState()); diff --git a/extensions-core/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/common/KubernetesPeonClientTest.java b/extensions-core/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/common/KubernetesPeonClientTest.java index 42ec881dbc5b..fa0da14fab73 100644 --- a/extensions-core/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/common/KubernetesPeonClientTest.java +++ b/extensions-core/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/common/KubernetesPeonClientTest.java @@ -30,7 +30,6 @@ import io.fabric8.kubernetes.client.dsl.LogWatch; import io.fabric8.kubernetes.client.server.mock.EnableKubernetesMockClient; import io.fabric8.kubernetes.client.server.mock.KubernetesMockServer; -import org.apache.druid.error.DruidException; import org.apache.druid.indexing.common.task.NoopTask; import org.apache.druid.java.util.common.StringUtils; import org.apache.druid.java.util.metrics.StubServiceEmitter; @@ -526,7 +525,7 @@ void test_getPeonPodWithRetries_withPod_returnsPod() void test_getPeonPodWithRetries_withoutPod_raisesKubernetesResourceNotFoundException() { Assertions.assertThrows( - DruidException.class, + KubernetesResourceNotFoundException.class, () -> instance.getPeonPodWithRetries(clientApi.getClient(), new K8sTaskId(ID).getK8sJobName(), 1, 1), StringUtils.format("K8s pod with label: job-name=%s not found", ID) );