Skip to content

Commit

Permalink
Revert "Fix error where communication failures to k8s can lead to stu…
Browse files Browse the repository at this point in the history
…ck tasks (#17431)"

This reverts commit 8850023.
  • Loading branch information
cryptoe committed Feb 20, 2025
1 parent 2316d21 commit dd23f4c
Show file tree
Hide file tree
Showing 4 changed files with 32 additions and 23 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,6 @@
import org.apache.druid.tasklogs.TaskLogs;
import org.checkerframework.checker.nullness.qual.MonotonicNonNull;

import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.nio.charset.Charset;
Expand Down Expand Up @@ -351,20 +350,23 @@ protected void startWatchingLogs()
protected void saveLogs()
{
try {
final Path file = Files.createTempFile(taskId.getOriginalTaskId(), "log");
Path file = Files.createTempFile(taskId.getOriginalTaskId(), "log");
try {
final InputStream logStream;
startWatchingLogs();
if (logWatch != null) {
logStream = logWatch.getOutput();
FileUtils.copyInputStreamToFile(logWatch.getOutput(), file.toFile());
} else {
logStream = kubernetesClient.getPeonLogs(taskId).or(
new ByteArrayInputStream(StringUtils.format(
log.debug("Log stream not found for %s", taskId.getOriginalTaskId());
FileUtils.writeStringToFile(
file.toFile(),
StringUtils.format(
"Peon for task [%s] did not report any logs. Check k8s metrics and events for the pod to see what happened.",
taskId
).getBytes(StandardCharsets.UTF_8))
),
Charset.defaultCharset()
);

}
FileUtils.copyInputStreamToFile(logStream, file.toFile());
taskLogs.pushTaskLog(taskId.getOriginalTaskId(), file.toFile());
}
catch (IOException e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@
import io.fabric8.kubernetes.api.model.batch.v1.Job;
import io.fabric8.kubernetes.client.KubernetesClient;
import io.fabric8.kubernetes.client.dsl.LogWatch;
import org.apache.druid.error.DruidException;
import org.apache.druid.indexing.common.task.IndexTaskUtils;
import org.apache.druid.indexing.common.task.Task;
import org.apache.druid.java.util.common.RetryUtils;
Expand Down Expand Up @@ -267,7 +266,7 @@ Pod getPeonPodWithRetries(KubernetesClient client, String jobName, int quietTrie
);
}
catch (Exception e) {
throw DruidException.defensive(e, "Error when looking for K8s pod with label: job-name=%s", jobName);
throw new KubernetesResourceNotFoundException("K8s pod with label: job-name=" + jobName + " not found");
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,10 +46,8 @@
import org.junit.Test;
import org.junit.runner.RunWith;

import java.io.ByteArrayInputStream;
import java.io.File;
import java.io.IOException;
import java.io.InputStream;
import java.lang.reflect.Field;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.ExecutionException;
Expand All @@ -60,9 +58,7 @@
public class KubernetesPeonLifecycleTest extends EasyMockSupport
{
private static final String ID = "id";
private static final String IP = "ip";
private static final TaskStatus SUCCESS = TaskStatus.success(ID);
private static final InputStream LOG_INPUT_STREAM = new ByteArrayInputStream("logs for task".getBytes(StandardCharsets.UTF_8));

@Mock KubernetesPeonClient kubernetesClient;
@Mock TaskLogs taskLogs;
Expand Down Expand Up @@ -328,7 +324,7 @@ public void test_join_withoutJob_returnsFailedTaskStatus() throws IOException
EasyMock.anyLong(),
EasyMock.eq(TimeUnit.MILLISECONDS)
)).andReturn(new JobResponse(null, PeonPhase.FAILED));
EasyMock.expect(kubernetesClient.getPeonLogs(k8sTaskId)).andReturn(Optional.absent());
EasyMock.expect(kubernetesClient.getPeonLogWatcher(k8sTaskId)).andReturn(Optional.absent());
EasyMock.expect(taskLogs.streamTaskStatus(ID)).andReturn(Optional.absent());

taskLogs.pushTaskLog(EasyMock.eq(ID), EasyMock.anyObject(File.class));
Expand Down Expand Up @@ -378,7 +374,7 @@ public void test_join() throws IOException
EasyMock.anyLong(),
EasyMock.eq(TimeUnit.MILLISECONDS)
)).andReturn(new JobResponse(job, PeonPhase.SUCCEEDED));
EasyMock.expect(kubernetesClient.getPeonLogs(k8sTaskId)).andReturn(Optional.of(LOG_INPUT_STREAM));
EasyMock.expect(kubernetesClient.getPeonLogWatcher(k8sTaskId)).andReturn(Optional.of(logWatch));
EasyMock.expect(taskLogs.streamTaskStatus(ID)).andReturn(Optional.of(
IOUtils.toInputStream(mapper.writeValueAsString(SUCCESS), StandardCharsets.UTF_8)
));
Expand All @@ -387,6 +383,8 @@ public void test_join() throws IOException
stateListener.stateChanged(KubernetesPeonLifecycle.State.RUNNING, ID);
EasyMock.expectLastCall().once();
stateListener.stateChanged(KubernetesPeonLifecycle.State.STOPPED, ID);
EasyMock.expectLastCall().once();
logWatch.close();
EasyMock.expectLastCall();

Assert.assertEquals(KubernetesPeonLifecycle.State.NOT_STARTED, peonLifecycle.getState());
Expand Down Expand Up @@ -426,18 +424,22 @@ public void test_join_whenCalledMultipleTimes_raisesIllegalStateException() thro
EasyMock.anyLong(),
EasyMock.eq(TimeUnit.MILLISECONDS)
)).andReturn(new JobResponse(job, PeonPhase.SUCCEEDED));
EasyMock.expect(kubernetesClient.getPeonLogs(k8sTaskId)).andReturn(Optional.of(LOG_INPUT_STREAM)).times(2);
EasyMock.expect(kubernetesClient.getPeonLogWatcher(k8sTaskId)).andReturn(Optional.of(logWatch));
EasyMock.expect(taskLogs.streamTaskStatus(ID)).andReturn(
Optional.of(IOUtils.toInputStream(mapper.writeValueAsString(SUCCESS), StandardCharsets.UTF_8))
);
taskLogs.pushTaskLog(EasyMock.eq(ID), EasyMock.anyObject(File.class));
EasyMock.expectLastCall();
taskLogs.pushTaskLog(EasyMock.eq(ID), EasyMock.anyObject(File.class));
EasyMock.expectLastCall();
logWatch.close();
EasyMock.expectLastCall();
stateListener.stateChanged(KubernetesPeonLifecycle.State.RUNNING, ID);
EasyMock.expectLastCall().once();
stateListener.stateChanged(KubernetesPeonLifecycle.State.STOPPED, ID);
EasyMock.expectLastCall().once();
logWatch.close();
EasyMock.expectLastCall();

Assert.assertEquals(KubernetesPeonLifecycle.State.NOT_STARTED, peonLifecycle.getState());

Expand Down Expand Up @@ -482,7 +484,7 @@ public void test_join_withoutTaskStatus_returnsFailedTaskStatus() throws IOExcep
EasyMock.anyLong(),
EasyMock.eq(TimeUnit.MILLISECONDS)
)).andReturn(new JobResponse(job, PeonPhase.SUCCEEDED));
EasyMock.expect(kubernetesClient.getPeonLogs(k8sTaskId)).andReturn(Optional.absent());
EasyMock.expect(kubernetesClient.getPeonLogWatcher(k8sTaskId)).andReturn(Optional.absent());
EasyMock.expect(taskLogs.streamTaskStatus(ID)).andReturn(Optional.absent());
taskLogs.pushTaskLog(EasyMock.eq(ID), EasyMock.anyObject(File.class));
EasyMock.expectLastCall();
Expand Down Expand Up @@ -530,14 +532,17 @@ public void test_join_whenIOExceptionThrownWhileStreamingTaskStatus_returnsFaile
EasyMock.anyLong(),
EasyMock.eq(TimeUnit.MILLISECONDS)
)).andReturn(new JobResponse(job, PeonPhase.SUCCEEDED));
EasyMock.expect(kubernetesClient.getPeonLogs(k8sTaskId)).andReturn(Optional.of(LOG_INPUT_STREAM));
EasyMock.expect(kubernetesClient.getPeonLogWatcher(k8sTaskId)).andReturn(Optional.of(logWatch));
EasyMock.expect(taskLogs.streamTaskStatus(ID)).andThrow(new IOException());
taskLogs.pushTaskLog(EasyMock.eq(ID), EasyMock.anyObject(File.class));
EasyMock.expectLastCall();
stateListener.stateChanged(KubernetesPeonLifecycle.State.RUNNING, ID);
EasyMock.expectLastCall().once();
stateListener.stateChanged(KubernetesPeonLifecycle.State.STOPPED, ID);
EasyMock.expectLastCall().once();
logWatch.close();
EasyMock.expectLastCall();

Assert.assertEquals(KubernetesPeonLifecycle.State.NOT_STARTED, peonLifecycle.getState());

replayAll();
Expand Down Expand Up @@ -577,7 +582,7 @@ public void test_join_whenIOExceptionThrownWhileStreamingTaskLogs_isIgnored() th
EasyMock.anyLong(),
EasyMock.eq(TimeUnit.MILLISECONDS)
)).andReturn(new JobResponse(job, PeonPhase.SUCCEEDED));
EasyMock.expect(kubernetesClient.getPeonLogs(k8sTaskId)).andReturn(Optional.of(LOG_INPUT_STREAM));
EasyMock.expect(kubernetesClient.getPeonLogWatcher(k8sTaskId)).andReturn(Optional.of(logWatch));
EasyMock.expect(taskLogs.streamTaskStatus(ID)).andReturn(
Optional.of(IOUtils.toInputStream(mapper.writeValueAsString(SUCCESS), StandardCharsets.UTF_8))
);
Expand All @@ -587,6 +592,8 @@ public void test_join_whenIOExceptionThrownWhileStreamingTaskLogs_isIgnored() th
EasyMock.expectLastCall().once();
stateListener.stateChanged(KubernetesPeonLifecycle.State.STOPPED, ID);
EasyMock.expectLastCall().once();
logWatch.close();
EasyMock.expectLastCall();

Assert.assertEquals(KubernetesPeonLifecycle.State.NOT_STARTED, peonLifecycle.getState());

Expand Down Expand Up @@ -618,13 +625,15 @@ public void test_join_whenRuntimeExceptionThrownWhileWaitingForKubernetesJob_thr
)).andThrow(new RuntimeException());

// We should still try to push logs
EasyMock.expect(kubernetesClient.getPeonLogs(k8sTaskId)).andReturn(Optional.of(LOG_INPUT_STREAM));
EasyMock.expect(kubernetesClient.getPeonLogWatcher(k8sTaskId)).andReturn(Optional.of(logWatch));
taskLogs.pushTaskLog(EasyMock.eq(ID), EasyMock.anyObject(File.class));
EasyMock.expectLastCall();
stateListener.stateChanged(KubernetesPeonLifecycle.State.RUNNING, ID);
EasyMock.expectLastCall().once();
stateListener.stateChanged(KubernetesPeonLifecycle.State.STOPPED, ID);
EasyMock.expectLastCall().once();
logWatch.close();
EasyMock.expectLastCall();

Assert.assertEquals(KubernetesPeonLifecycle.State.NOT_STARTED, peonLifecycle.getState());

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@
import io.fabric8.kubernetes.client.dsl.LogWatch;
import io.fabric8.kubernetes.client.server.mock.EnableKubernetesMockClient;
import io.fabric8.kubernetes.client.server.mock.KubernetesMockServer;
import org.apache.druid.error.DruidException;
import org.apache.druid.indexing.common.task.NoopTask;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.metrics.StubServiceEmitter;
Expand Down Expand Up @@ -526,7 +525,7 @@ void test_getPeonPodWithRetries_withPod_returnsPod()
void test_getPeonPodWithRetries_withoutPod_raisesKubernetesResourceNotFoundException()
{
Assertions.assertThrows(
DruidException.class,
KubernetesResourceNotFoundException.class,
() -> instance.getPeonPodWithRetries(clientApi.getClient(), new K8sTaskId(ID).getK8sJobName(), 1, 1),
StringUtils.format("K8s pod with label: job-name=%s not found", ID)
);
Expand Down

0 comments on commit dd23f4c

Please sign in to comment.