Skip to content

Commit

Permalink
fix(core): flow listeners injection by blocked the startup, use a run
Browse files Browse the repository at this point in the history
  • Loading branch information
tchiotludo committed Feb 4, 2022
1 parent 30ff6ac commit 39d2b32
Show file tree
Hide file tree
Showing 8 changed files with 21 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -35,9 +35,9 @@ public Integer call() throws Exception {
super.call();

FlowListenersInterface flowListeners = applicationContext.getBean(FlowListenersInterface.class);

AtomicReference<ZonedDateTime> lastTime = new AtomicReference<>(ZonedDateTime.now());

flowListeners.run();
flowListeners.listen(flows -> {
long count = flows.stream().filter(flow -> !flow.isDeleted()).count();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@
public abstract class AbstractScheduler implements Runnable, AutoCloseable {
protected final ApplicationContext applicationContext;
private final QueueInterface<Execution> executionQueue;
private final FlowListenersInterface flowListeners;
protected final FlowListenersInterface flowListeners;
private final RunContextFactory runContextFactory;
private final MetricRegistry metricRegistry;
private final ConditionService conditionService;
Expand Down Expand Up @@ -91,6 +91,8 @@ public AbstractScheduler(

@Override
public void run() {
flowListeners.run();

ScheduledFuture<?> handle = scheduleExecutor.scheduleAtFixedRate(
this::handle,
0,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@ public DefaultScheduler(
@SuppressWarnings("unchecked")
@Override
public void run() {
flowListeners.run();

QueueInterface<Execution> executionQueue = applicationContext.getBean(QueueInterface.class, Qualifiers.byName(QueueFactoryInterface.EXECUTION_NAMED));
QueueInterface<Trigger> triggerQueue = applicationContext.getBean(QueueInterface.class, Qualifiers.byName(QueueFactoryInterface.TRIGGER_NAMED));

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@
import java.util.function.Consumer;

public interface FlowListenersInterface {
void run();

void listen(Consumer<List<Flow>> consumer);

List<Flow> flows();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@ protected static Flow create(String flowId, String taskId) {
}

public void suite(FlowListenersInterface flowListenersService) {
flowListenersService.run();

AtomicInteger count = new AtomicInteger();
var ref = new Ref();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
import org.apache.kafka.streams.state.ReadOnlyKeyValueStore;
import io.kestra.core.models.flows.Flow;
import io.kestra.core.services.FlowListenersInterface;
import io.kestra.core.services.FlowService;
import io.kestra.runner.kafka.serializers.JsonSerde;

import java.util.ArrayList;
Expand All @@ -33,18 +32,20 @@
@KafkaQueueEnabled
public class KafkaFlowListeners implements FlowListenersInterface {
private final KafkaAdminService kafkaAdminService;
private final KafkaStreamService kafkaStreamService;
private SafeKeyValueStore<String, Flow> store;
private final List<Consumer<List<Flow>>> consumers = new ArrayList<>();
private final KafkaStreamService.Stream stream;
private KafkaStreamService.Stream stream;
private List<Flow> flows;

@Inject
public KafkaFlowListeners(
KafkaAdminService kafkaAdminService,
KafkaStreamService kafkaStreamService
) {
public KafkaFlowListeners(KafkaAdminService kafkaAdminService, KafkaStreamService kafkaStreamService) {
this.kafkaAdminService = kafkaAdminService;
this.kafkaStreamService = kafkaStreamService;
}

@Override
public void run() {
kafkaAdminService.createIfNotExist(KafkaStreamSourceService.TOPIC_FLOWLAST);

stream = kafkaStreamService.of(FlowListener.class, FlowListener.class, new FlowListener().topology(), log);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ public KafkaStreamService.Stream of(Class<?> clientId, Class<?> groupId, Topolog
}

Stream stream = new Stream(topology, properties, metricsEnabled ? metricRegistry : null, logger);
eventPublisher.publishEventAsync(new KafkaStreamEndpoint.Event(clientId.getName(), stream));
eventPublisher.publishEvent(new KafkaStreamEndpoint.Event(clientId.getName(), stream));

return stream;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,9 +37,11 @@ public MemoryFlowListeners(
@Named(QueueFactoryInterface.FLOW_NAMED) QueueInterface<Flow> flowQueue
) {
this.flowQueue = flowQueue;

this.flows = flowRepository.findAll();
}

@Override
public void run() {
this.flowQueue.receive(flow -> {
if (flow.isDeleted()) {
this.remove(flow);
Expand Down

0 comments on commit 39d2b32

Please sign in to comment.