Skip to content

Commit

Permalink
Merge branch 'master' into retrofit2-upgrade
Browse files Browse the repository at this point in the history
  • Loading branch information
mergify[bot] authored Jan 29, 2025
2 parents a5ed6b0 + ebba3ba commit 3c25b80
Show file tree
Hide file tree
Showing 4 changed files with 49 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,29 @@ public void testListRunningTasks() {
.contains(t2.getId());
}

@Test
public void testListByThisInstance() {
Task t1 = subject.create("Test", "STARTED");
Task t2 = subject.create("Test", "STARTED");
Task t3 = subject.create("Test", "STARTED");
Task t4 = subject.create("Test", "STARTED");
Task t5 = subject.create("Test", "STARTED");
String ownerId = ClouddriverHostname.ID;

t3.updateOwnerId("foo@not_this_clouddriver", "Test");
t5.complete();

List<Task> runningTasks = subject.listByThisInstance();

assertThat(runningTasks.stream().allMatch(t -> t.getOwnerId().equals(ownerId))).isTrue();
assertThat(runningTasks.stream().map(Task::getId).collect(Collectors.toList()))
.contains(t1.getId(), t2.getId(), t4.getId());
// Task 3 doesn't belong to this pod and task 5 is not running, so should not be included in the
// result
assertThat(runningTasks.stream().map(Task::getId).collect(Collectors.toList()))
.doesNotContain(t3.getId(), t5.getId());
}

@Test
public void testResultObjectsPersistence() {
Task t1 = subject.create("Test", "Test Status");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@

package com.netflix.spinnaker.clouddriver.data.task

import com.netflix.spinnaker.clouddriver.core.ClouddriverHostname

import java.util.concurrent.ConcurrentHashMap

class InMemoryTaskRepository implements TaskRepository {
Expand Down Expand Up @@ -60,7 +62,7 @@ class InMemoryTaskRepository implements TaskRepository {

@Override
List<Task> listByThisInstance() {
return list()
return list().findAll { it.ownerId == ClouddriverHostname.ID }
}

private String getNextId() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,8 @@ import org.jooq.Condition
import org.jooq.DSLContext
import org.jooq.Record
import org.jooq.Select
import org.jooq.impl.DSL
import org.jooq.impl.DSL.field
import org.jooq.impl.DSL.max
import org.jooq.impl.DSL.sql
import org.slf4j.LoggerFactory

Expand Down Expand Up @@ -360,13 +360,29 @@ class SqlTaskRepository(
* Since task statuses are insert-only, we first need to find the most
* recent status record for each task ID and the filter that result set
* down to the ones that are running.
*
* Query used:
* SELECT a.task_id
* FROM task_states AS `a`
* JOIN (
* SELECT task_id, MAX(created_at) AS `created`
* FROM task_states
* GROUP BY task_id
* ) AS `b`
* ON (a.task_id = b.task_id AND a.created_at = b.created)
* JOIN tasks AS `t`
* ON (a.task_id = t.id)
* WHERE (
* t.owner_id = '<clouddriver host name>'
* and a.state = 'STARTED'
* )
*/
private fun runningTaskIds(ctx: DSLContext, thisInstance: Boolean): Array<String> {
return withPool(poolName) {
val baseQuery = ctx.select(field("a.task_id"))
.from(taskStatesTable.`as`("a"))
.innerJoin(
ctx.select(field("task_id"), DSL.max(field("created_at")).`as`("created"))
ctx.select(field("task_id"), max(field("created_at")).`as`("created"))
.from(taskStatesTable)
.groupBy(field("task_id"))
.asTable("b")
Expand All @@ -377,10 +393,10 @@ class SqlTaskRepository(
.innerJoin(tasksTable.`as`("t")).on(sql("a.task_id = t.id"))
.where(
field("t.owner_id").eq(ClouddriverHostname.ID)
.and(field("a.state").eq(TaskState.STARTED.toString()))
.and(field("a.state").eq(STARTED.toString()))
)
} else {
baseQuery.where(field("a.state").eq(TaskState.STARTED.toString()))
baseQuery.where(field("a.state").eq(STARTED.toString()))
}

select
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -213,6 +213,7 @@ class OperationsController {
*/
@PreDestroy
void destroy() {
log.info("Destroy has been triggered. Initiating graceful shutdown of tasks.")
long start = System.currentTimeMillis()
def tasks = taskRepository.listByThisInstance()
while (tasks && !tasks.isEmpty() &&
Expand All @@ -225,6 +226,8 @@ class OperationsController {
if (tasks && !tasks.isEmpty()) {
log.error("Shutting down while tasks '{}' are still in progress!", tasks)
}

log.info("Destruction procedure completed.")
}

private StartOperationResult start(@Nullable String cloudProvider,
Expand Down

0 comments on commit 3c25b80

Please sign in to comment.