Skip to content

Commit

Permalink
control-service: logs endpoint doesn't hang (#2370)
Browse files Browse the repository at this point in the history
# Why 
Closes: #2041 
if a job is running and we make a request for its logs the control
service hangs and after a very long time returns an error.



# What
The error was happening because we were reading the logs for kubernetes
using an inputStream function.
Kuberenetes was streaming the logs to the control plane and doesn't
close the input stream till the jobs is completed.

Prevoiusly we were using 
```java
new PodLogs(client).streamNamespacedPodLog(...)
```
Within this function Kubernetes calls
```java
new CoreV1Api(client).readNamespacedPodLogCall(... , follow=true)
```

Instead of calling `PodLogs.streamNamespacedPodLog` instead I call
`CoreV1Api.readNamespacedPodLogCall` directly and I set follow=false.
Now the function returns immediately. 


# How has this been tested
Integration test.
I have created a job that runs for 20 minutes. I query for its logs and
make sure that it returns within 5 seconds.




Signed-off-by: murphp15 <[email protected]>

---------

Co-authored-by: github-actions <>
  • Loading branch information
murphp15 authored Jul 10, 2023
1 parent d7c1f94 commit 62e2da9
Show file tree
Hide file tree
Showing 3 changed files with 112 additions and 5 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
/*
* Copyright 2021-2023 VMware, Inc.
* SPDX-License-Identifier: Apache-2.0
*/

package com.vmware.taurus.datajobs.it;

import com.vmware.taurus.ControlplaneApplication;
import com.vmware.taurus.controlplane.model.data.DataJobExecution;
import com.vmware.taurus.datajobs.it.common.BaseIT;
import com.vmware.taurus.datajobs.it.common.DataJobDeploymentExtension;
import com.vmware.taurus.datajobs.it.common.JobExecutionUtil;
import com.vmware.taurus.service.execution.JobExecutionService;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.tuple.ImmutablePair;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.RegisterExtension;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.testcontainers.shaded.org.awaitility.Awaitility;

import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

import static com.vmware.taurus.datajobs.it.common.JobExecutionUtil.testDataJobExecutionRead;

@Slf4j
@SpringBootTest(
webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT,
classes = ControlplaneApplication.class)
public class DataJobViewLogsIT extends BaseIT {

@Autowired private JobExecutionService executionService;

// simple_job_read_logs.zip contains a job that would run for more than 30 minutes if allowed
@RegisterExtension
static DataJobDeploymentExtension dataJobDeploymentExtension =
new DataJobDeploymentExtension("simple_job_read_logs.zip");

@Test
public void testJobLogsViewing(
String jobName, String username, String deploymentId, String teamName) throws Exception {
// manually start job execution
ImmutablePair<String, String> executeDataJobResult =
JobExecutionUtil.executeDataJob(jobName, teamName, username, deploymentId, mockMvc);
String executionId = executeDataJobResult.getRight();
String opId = executeDataJobResult.getLeft();
testDataJobExecutionRead(
executionId,
DataJobExecution.StatusEnum.RUNNING,
opId,
jobName,
teamName,
username,
mockMvc);
Awaitility.await()
.atMost(6, TimeUnit.MINUTES)
.ignoreExceptionsMatching(
(e) ->
// It is only by looking 3 exceptions deep that we can tell the reason the logs read
// failed.
e.getCause()
.getCause()
// If 400 is in the error message then the pod is not up yet and we should keep
// trying to poll for the logs.
.getMessage()
.contains("400"))
.until(
() -> {
getLogsFast(jobName, teamName, executionId);
return true;
});
}

/** We are testing that the server returns logs quickly. */
private void getLogsFast(String jobName, String teamName, String executionId)
throws InterruptedException, ExecutionException, TimeoutException {
Executors.newSingleThreadExecutor()
.submit(
() -> {
executionService.getJobExecutionLogs(teamName, jobName, executionId, 0);
})
.get(5, TimeUnit.SECONDS);
}
}
Binary file not shown.
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import io.kubernetes.client.util.Yaml;
import lombok.*;
import net.javacrumbs.shedlock.spring.annotation.EnableSchedulerLock;
import okhttp3.Response;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.tuple.ImmutablePair;
import org.apache.commons.lang3.tuple.Pair;
Expand Down Expand Up @@ -914,18 +915,15 @@ public Optional<String> getJobLogs(String jobName, Integer tailLines)
var pods = listJobPods(job.get());
if (pods.size() > 0) {
var pod = pods.get(pods.size() - 1);
PodLogs podLogs = new PodLogs(client);

try (BufferedReader br =
new BufferedReader(
new InputStreamReader(
podLogs.streamNamespacedPodLog(
readNamespacedPodLog(
pod.getMetadata().getNamespace(),
pod.getMetadata().getName(),
pod.getSpec().getContainers().get(0).getName(),
null,
tailLines,
true),
tailLines),
Charsets.UTF_8))) {
String logs = br.lines().collect(Collectors.joining(System.lineSeparator()));
return Optional.of(logs);
Expand All @@ -935,6 +933,28 @@ public Optional<String> getJobLogs(String jobName, Integer tailLines)
return Optional.empty();
}

/**
* This function is copy and pasted from the kuberenetes class PodLogs. the only difference is
* that we set follow=false
*/
private InputStream readNamespacedPodLog(
String namespace, String name, String container, Integer tailLines)
throws ApiException, IOException {
Response response =
new CoreV1Api(client)
.readNamespacedPodLogCall(
name, namespace, container, false, null, null, "false", false, null, tailLines,
true, null)
.execute();
if (!response.isSuccessful()) {
if (response.body() != null) {
response.close();
}
throw new ApiException(response.code(), "Logs request failed: " + response.code());
}
return response.body().byteStream();
}

public JobStatusCondition watchJob(
String jobName, int timeoutSeconds, Consumer<JobStatus> watcher)
throws ApiException, IOException, InterruptedException {
Expand Down

0 comments on commit 62e2da9

Please sign in to comment.