Skip to content

Commit

Permalink
feat(cli): reindex all flows command (#1846)
Browse files Browse the repository at this point in the history
  • Loading branch information
loicmathieu authored Aug 8, 2023
1 parent 61ffdd1 commit be58d0e
Show file tree
Hide file tree
Showing 3 changed files with 105 additions and 1 deletion.
52 changes: 52 additions & 0 deletions cli/src/main/java/io/kestra/cli/commands/sys/ReindexCommand.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
package io.kestra.cli.commands.sys;

import io.kestra.cli.AbstractCommand;
import io.kestra.cli.App;
import io.kestra.core.models.flows.FlowWithSource;
import io.kestra.core.repositories.FlowRepositoryInterface;
import io.kestra.core.services.TaskDefaultService;
import io.micronaut.configuration.picocli.PicocliRunner;
import io.micronaut.context.ApplicationContext;
import jakarta.inject.Inject;
import lombok.extern.slf4j.Slf4j;
import picocli.CommandLine;

import java.util.List;

@CommandLine.Command(
name = "reindex",
description = "reindex all records of a type: read them from the database then update them",
mixinStandardHelpOptions = true,
subcommands = {
RestoreQueueCommand.class,
FlowListenersRestoreCommand.class
}
)
@Slf4j
public class ReindexCommand extends AbstractCommand {
@Inject
private ApplicationContext applicationContext;

@CommandLine.Option(names = {"-t", "--type"}, description = "The type of the records to reindex, only 'flow' is supported for now.")
private String type;

@Override
public Integer call() throws Exception {
super.call();

if ("flow".equals(type)) {
FlowRepositoryInterface flowRepository = applicationContext.getBean(FlowRepositoryInterface.class);
TaskDefaultService taskDefaultService = applicationContext.getBean(TaskDefaultService.class);

List<FlowWithSource> flows = flowRepository.findWithSource(null, null, null);
flows.forEach(flow -> flowRepository.update(flow.toFlow(), flow.toFlow(), flow.getSource(), taskDefaultService.injectDefaults(flow.toFlow())));

stdOut("Successfully reindex " + flows.size() + " flow(s).");
}
else {
throw new IllegalArgumentException("Reindexing type '" + type + "' is not supported");
}

return 0;
}
}
3 changes: 2 additions & 1 deletion cli/src/main/java/io/kestra/cli/commands/sys/SysCommand.java
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,8 @@
mixinStandardHelpOptions = true,
subcommands = {
RestoreQueueCommand.class,
FlowListenersRestoreCommand.class
FlowListenersRestoreCommand.class,
ReindexCommand.class
}
)
@Slf4j
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
package io.kestra.cli.commands.sys;

import io.kestra.cli.commands.flows.namespaces.FlowNamespaceUpdateCommand;
import io.micronaut.configuration.picocli.PicocliRunner;
import io.micronaut.context.ApplicationContext;
import io.micronaut.context.env.Environment;
import io.micronaut.runtime.server.EmbeddedServer;
import org.junit.jupiter.api.Test;

import java.io.ByteArrayOutputStream;
import java.io.PrintStream;
import java.net.URL;

import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.core.Is.is;
import static org.hamcrest.core.StringContains.containsString;

class ReindexCommandTest {
@Test
void reindexFlow() {
URL directory = ReindexCommandTest.class.getClassLoader().getResource("flows");
ByteArrayOutputStream out = new ByteArrayOutputStream();
System.setOut(new PrintStream(out));

try (ApplicationContext ctx = ApplicationContext.run(Environment.CLI, Environment.TEST)) {
EmbeddedServer embeddedServer = ctx.getBean(EmbeddedServer.class);
embeddedServer.start();

// we use the update command to add flows to extract
String[] updateArgs = {
"--server",
embeddedServer.getURL().toString(),
"--user",
"myuser:pass:word",
"io.kestra.cli",
directory.getPath(),
};
PicocliRunner.call(FlowNamespaceUpdateCommand.class, ctx, updateArgs);
assertThat(out.toString(), containsString("3 flow(s)"));

// then we reindex them
String[] reindexArgs = {
"--type",
"flow",
};
Integer call = PicocliRunner.call(ReindexCommand.class, ctx, reindexArgs);
assertThat(call, is(0));
assertThat(out.toString(), containsString("Successfully reindex 3 flow(s)."));
}
}
}

0 comments on commit be58d0e

Please sign in to comment.