Skip to content

Commit

Permalink
test(): Writing tests
Browse files Browse the repository at this point in the history
  • Loading branch information
Skraye committed Sep 25, 2023
1 parent 49aed48 commit 2a3d294
Show file tree
Hide file tree
Showing 12 changed files with 268 additions and 4 deletions.
2 changes: 1 addition & 1 deletion cli/src/main/resources/application.yml
Original file line number Diff line number Diff line change
Expand Up @@ -173,5 +173,5 @@ kestra:
fixed-delay: 1h

heartbeat:
frequency: 5s
frequency: 10s
heartbeat-missed: 3
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
package io.kestra.core.repositories;

import io.micronaut.test.extensions.junit5.annotation.MicronautTest;
import jakarta.inject.Inject;
import org.junit.jupiter.api.BeforeEach;

@MicronautTest(transactional = false)
public abstract class AbstractWorkerInstanceRepositoryTest {

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
package io.kestra.repository.h2;

import io.kestra.jdbc.repository.AbstractJdbcWorkerInstanceRepositoryTest;

public class H2WorkerInstanceRepositoryTest extends AbstractJdbcWorkerInstanceRepositoryTest {
}
9 changes: 9 additions & 0 deletions jdbc-h2/src/test/resources/application.yml
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,9 @@ kestra:
type: local
local:
base-path: /tmp/unittest
heartbeat:
frequency: 10s
heartbeat-missed: 3

jdbc:
tables:
Expand Down Expand Up @@ -64,6 +67,12 @@ kestra:
flowtopologies:
table: "flow_topologies"
cls: io.kestra.core.models.topologies.FlowTopology
workerinstance:
cls: io.kestra.core.runners.WorkerInstance
table: "worker_instance"
workerjobrunning:
cls: io.kestra.core.runners.WorkerJobRunning
table: "worker_job_running"

queues:
min-poll-interval: 10ms
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
package io.kestra.repository.mysql;

import io.kestra.jdbc.repository.AbstractJdbcWorkerInstanceRepositoryTest;

public class MysqlWorkerInstanceRepositoryTest extends AbstractJdbcWorkerInstanceRepositoryTest {

}
9 changes: 9 additions & 0 deletions jdbc-mysql/src/test/resources/application.yml
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,9 @@ kestra:
type: local
local:
base-path: /tmp/unittest
heartbeat:
frequency: 10s
heartbeat-missed: 3

jdbc:
tables:
Expand Down Expand Up @@ -65,6 +68,12 @@ kestra:
flowtopologies:
table: "flow_topologies"
cls: io.kestra.core.models.topologies.FlowTopology
workerinstance:
cls: io.kestra.core.runners.WorkerInstance
table: "worker_instance"
workerjobrunning:
cls: io.kestra.core.runners.WorkerJobRunning
table: "worker_job_running"

queues:
min-poll-interval: 10ms
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
package io.kestra.repository.postgres;

import io.kestra.jdbc.repository.AbstractJdbcWorkerInstanceRepositoryTest;

public class PostgresWorkerInstanceRepositoryTest extends AbstractJdbcWorkerInstanceRepositoryTest {

}
9 changes: 9 additions & 0 deletions jdbc-postgres/src/test/resources/application.yml
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,9 @@ kestra:
type: local
local:
base-path: /tmp/unittest
heartbeat:
frequency: 10s
heartbeat-missed: 3

jdbc:
tables:
Expand Down Expand Up @@ -66,6 +69,12 @@ kestra:
flowtopologies:
table: "flow_topologies"
cls: io.kestra.core.models.topologies.FlowTopology
workerinstance:
cls: io.kestra.core.runners.WorkerInstance
table: "worker_instance"
workerjobrunning:
cls: io.kestra.core.runners.WorkerJobRunning
table: "worker_job_running"

queues:
min-poll-interval: 10ms
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,8 +54,9 @@ public Optional<WorkerInstance> heartbeatCheckUp(String workerUuid) {
Optional<WorkerInstance> workerInstance = this.jdbcRepository.fetchOne(select);

if (workerInstance.isPresent()) {
this.save(workerInstance.get().toBuilder().status(WorkerInstance.Status.UP).heartbeatDate(Instant.now()).build());
return workerInstance;
WorkerInstance updatedWorker = workerInstance.get().toBuilder().status(WorkerInstance.Status.UP).heartbeatDate(Instant.now()).build();
this.save(updatedWorker);
return Optional.of(updatedWorker);
}
return Optional.empty();
});
Expand Down Expand Up @@ -119,7 +120,6 @@ public Boolean heartbeatsCleanup() {
return bool.get();
}


@Override
public List<WorkerInstance> findAll() {
return this.jdbcRepository
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,143 @@
package io.kestra.jdbc.repository;

import io.kestra.core.repositories.AbstractWorkerInstanceRepositoryTest;
import io.kestra.core.runners.WorkerInstance;
import io.kestra.jdbc.JdbcTestUtils;
import io.kestra.jdbc.JooqDSLContextWrapper;
import jakarta.inject.Inject;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;

import java.time.Instant;
import java.util.List;
import java.util.Optional;
import java.util.UUID;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;

import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.greaterThan;
import static org.hamcrest.Matchers.is;

public abstract class AbstractJdbcWorkerInstanceRepositoryTest extends AbstractWorkerInstanceRepositoryTest {
@Inject
protected AbstractJdbcWorkerInstanceRepository workerInstanceRepository;

@Inject
JdbcTestUtils jdbcTestUtils;

@Inject
protected JooqDSLContextWrapper dslContextWrapper;

@BeforeEach
protected void init() {
jdbcTestUtils.drop();
jdbcTestUtils.migrate();
}

@Test
protected void save() {
WorkerInstance workerInstance = createWorkerInstance(UUID.randomUUID().toString());
workerInstanceRepository.save(workerInstance);

Optional<WorkerInstance> find = workerInstanceRepository.findByWorkerUuid(workerInstance.getWorkerUuid().toString());
assertThat(find.isPresent(), is(true));
assertThat(find.get().getWorkerUuid(), is(workerInstance.getWorkerUuid()));
}

@Test
protected void delete() {
WorkerInstance workerInstance = createWorkerInstance(UUID.randomUUID().toString());
workerInstanceRepository.save(workerInstance);

Optional<WorkerInstance> find = workerInstanceRepository.findByWorkerUuid(workerInstance.getWorkerUuid().toString());
assertThat(find.isPresent(), is(true));
assertThat(find.get().getWorkerUuid(), is(workerInstance.getWorkerUuid()));

workerInstanceRepository.delete(workerInstance);
find = workerInstanceRepository.findByWorkerUuid(workerInstance.getWorkerUuid().toString());
assertThat(find.isPresent(), is(false));
}

@Test
protected void findAll() {
WorkerInstance workerInstance = createWorkerInstance(UUID.randomUUID().toString());
WorkerInstance workerInstanceAlive = createWorkerInstance(UUID.randomUUID().toString());
WorkerInstance workerInstanceDead = createWorkerInstance(UUID.randomUUID().toString(), false);

workerInstanceRepository.save(workerInstance);
workerInstanceRepository.save(workerInstanceAlive);
workerInstanceRepository.save(workerInstanceDead);

List<WorkerInstance> finds = workerInstanceRepository.findAll();
assertThat(finds.size(), is(3));

finds = workerInstanceRepository.findAllAlive();
assertThat(finds.size(), is(2));

finds = workerInstanceRepository.findAllDead();
assertThat(finds.size(), is(1));
}

@Test
protected void find() {
WorkerInstance workerInstance = createWorkerInstance(UUID.randomUUID().toString());
workerInstanceRepository.save(workerInstance);

Optional<WorkerInstance> find = workerInstanceRepository.findByWorkerUuid(workerInstance.getWorkerUuid().toString());
assertThat(find.isPresent(), is(true));
assertThat(find.get().getWorkerUuid(), is(workerInstance.getWorkerUuid()));
}

@Test
protected void heartbeatCheckup() throws InterruptedException {
WorkerInstance workerInstance = createWorkerInstance(UUID.randomUUID().toString());
workerInstanceRepository.save(workerInstance);
CountDownLatch queueCount = new CountDownLatch(1);

queueCount.await(15, TimeUnit.SECONDS);
Optional<WorkerInstance> updatedWorkerInstance = workerInstanceRepository.heartbeatCheckUp(workerInstance.getWorkerUuid().toString());
assertThat(updatedWorkerInstance.isPresent(), is(true));
assertThat(updatedWorkerInstance.get().getHeartbeatDate(), greaterThan(workerInstance.getHeartbeatDate()));
}

@Test
protected void heartbeatsStatusUpdate() {
WorkerInstance workerInstance = createWorkerInstance(UUID.randomUUID().toString());
workerInstance.setHeartbeatDate(Instant.now().minusSeconds(3600));
workerInstanceRepository.save(workerInstance);

workerInstanceRepository.heartbeatsStatusUpdate();
Optional<WorkerInstance> find = workerInstanceRepository.findByWorkerUuid(workerInstance.getWorkerUuid().toString());
assertThat(find.isPresent(), is(true));
assertThat(find.get().getStatus(), is(WorkerInstance.Status.DEAD));
}

@Test
protected void heartbeatsCleanup() {
WorkerInstance workerInstance = createWorkerInstance(UUID.randomUUID().toString());
workerInstance.setHeartbeatDate(Instant.now().minusSeconds(3600));
workerInstance.setStatus(WorkerInstance.Status.DEAD);
workerInstanceRepository.save(workerInstance);

workerInstanceRepository.heartbeatsCleanup();
Optional<WorkerInstance> find = workerInstanceRepository.findByWorkerUuid(workerInstance.getWorkerUuid().toString());
assertThat(find.isPresent(), is(false));
}

private WorkerInstance createWorkerInstance(String workerUuid, Boolean alive) {
return WorkerInstance.builder()
.workerUuid(UUID.fromString(workerUuid))
.workerGroup(null)
.managementPort(0)
.hostname("kestra.io")
.partitions(null)
.port(0)
.status(alive ? WorkerInstance.Status.UP : WorkerInstance.Status.DEAD)
.build();
}

private WorkerInstance createWorkerInstance(String workerUuid) {
return createWorkerInstance(workerUuid, true);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
package io.kestra.webserver.controllers;

import io.kestra.core.runners.WorkerInstance;
import io.kestra.jdbc.repository.AbstractJdbcWorkerInstanceRepository;
import io.kestra.webserver.controllers.h2.JdbcH2ControllerTest;
import io.micronaut.core.type.Argument;
import io.micronaut.http.HttpRequest;
import io.micronaut.http.client.annotation.Client;
import io.micronaut.rxjava2.http.client.RxHttpClient;
import jakarta.inject.Inject;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;

import java.util.List;
import java.util.UUID;

import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.is;

class WorkerInstanceControllerTest extends JdbcH2ControllerTest {
@Inject
@Client("/")
RxHttpClient client;

@Inject
AbstractJdbcWorkerInstanceRepository jdbcWorkerInstanceRepository;

@BeforeEach
protected void init() {
super.setup();
}

@SuppressWarnings("unchecked")
@Test
void list() {
WorkerInstance workerInstance = WorkerInstance.builder()
.workerUuid(UUID.randomUUID())
.workerGroup(null)
.managementPort(0)
.hostname("kestra.io")
.partitions(null)
.port(0)
.status(WorkerInstance.Status.UP)
.build();


jdbcWorkerInstanceRepository.save(workerInstance);

List<WorkerInstance> find = client.toBlocking().retrieve(HttpRequest.GET("/api/v1/workers"), Argument.of(List.class, WorkerInstance.class));
assertThat(find.size(), is(1));
assertThat(find.get(0).getWorkerUuid(), is(workerInstance.getWorkerUuid()));
}

}
10 changes: 10 additions & 0 deletions webserver/src/test/resources/application.yml
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,14 @@ jackson:
FAIL_ON_UNKNOWN_PROPERTIES: false

kestra:
server-type: STANDALONE
storage:
type: local
local:
base-path: /tmp/unittest
heartbeat:
frequency: 10s
heartbeat-missed: 3
anonymous-usage-report:
enabled: true
uri: https://api.kestra.io/v1/reports/usages
Expand Down Expand Up @@ -81,6 +85,12 @@ kestra:
flowtopologies:
table: "flow_topologies"
cls: io.kestra.core.models.topologies.FlowTopology
workerinstance:
cls: io.kestra.core.runners.WorkerInstance
table: "worker_instance"
workerjobrunning:
cls: io.kestra.core.runners.WorkerJobRunning
table: "worker_job_running"
datasources:
h2:
url: jdbc:h2:mem:public;DB_CLOSE_DELAY=-1;DB_CLOSE_ON_EXIT=FALSE
Expand Down

0 comments on commit 2a3d294

Please sign in to comment.