Skip to content

Commit

Permalink
concord-agent-operator: use informers API
Browse files Browse the repository at this point in the history
  • Loading branch information
ibodrov committed Jan 28, 2025
1 parent ee2d45c commit 9320ea2
Show file tree
Hide file tree
Showing 2 changed files with 39 additions and 33 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -25,69 +25,74 @@
import com.walmartlabs.concord.agentoperator.crd.AgentPool;
import com.walmartlabs.concord.agentoperator.crd.AgentPoolList;
import com.walmartlabs.concord.agentoperator.scheduler.AutoScalerFactory;
import com.walmartlabs.concord.agentoperator.scheduler.Event;
import com.walmartlabs.concord.agentoperator.scheduler.Scheduler;
import io.fabric8.kubernetes.client.DefaultKubernetesClient;
import io.fabric8.kubernetes.client.Watcher;
import io.fabric8.kubernetes.client.WatcherException;
import io.fabric8.kubernetes.client.informers.ResourceEventHandler;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.time.Duration;
import java.util.concurrent.Executors;

import static com.walmartlabs.concord.agentoperator.scheduler.Event.Type.DELETED;
import static com.walmartlabs.concord.agentoperator.scheduler.Event.Type.MODIFIED;

public class Operator {

private static final Logger log = LoggerFactory.getLogger(Operator.class);

private static final long RESYNC_PERIOD = Duration.ofSeconds(10).toMillis();

public static void main(String[] args) {
// TODO support overloading the CRD with an external file?

var namespace = System.getenv("WATCH_NAMESPACE");
if (namespace == null) {
namespace = "default";
}

var client = new DefaultKubernetesClient() // NOSONAR
.inNamespace(namespace);

var namespace = getEnv("WATCH_NAMESPACE", "default");
var baseUrl = getEnv("CONCORD_BASE_URL", "http://192.168.99.1:8001"); // use minikube/vbox host's default address
var apiToken = getEnv("CONCORD_API_TOKEN", null);
var useMaintenanceMode = Boolean.parseBoolean(getEnv("USE_AGENT_MAINTENANCE_MODE", "false"));

// TODO use secrets for the token?
var cfg = new Scheduler.Configuration(baseUrl, apiToken);
var client = new DefaultKubernetesClient().inNamespace(namespace);
var autoScalerFactory = new AutoScalerFactory(cfg, client);
var agentClientFactory = new AgentClientFactory(useMaintenanceMode);
var executor = Executors.newCachedThreadPool();

var scheduler = new Scheduler(autoScalerFactory, client, agentClientFactory);
scheduler.start();

log.info("main -> my watch begins... (namespace={})", namespace);

var dummyClient = client.resources(AgentPool.class, AgentPoolList.class);
var informer = client.resources(AgentPool.class, AgentPoolList.class).inAnyNamespace()
.inform(new ResourceEventHandler<>() {

@Override
public void onAdd(AgentPool resource) {
executor.submit(() -> scheduler.onEvent(MODIFIED, resource));
}

@Override
public void onUpdate(AgentPool oldResource, AgentPool newResource) {
if (oldResource == newResource) {
return;
}
executor.submit(() -> scheduler.onEvent(MODIFIED, newResource));
}

@Override
public void onDelete(AgentPool resource, boolean deletedFinalStateUnknown) {
executor.submit(() -> scheduler.onEvent(DELETED, resource));
}
}, RESYNC_PERIOD);

try {
dummyClient.watch(new Watcher<>() {
@Override
public void eventReceived(Action action, AgentPool resource) {
scheduler.onEvent(actionToEvent(action), resource);
}

@Override
public void onClose(WatcherException we) {
log.error("Watcher exception {}", we.getMessage(), we);
}
});
informer.run();
} catch (Exception e) {
log.error("Error while watching for CRs (namespace={})", namespace, e);
System.exit(2);
}
}

private static Event.Type actionToEvent(Watcher.Action action) {
return switch (action) {
case ADDED, MODIFIED -> Event.Type.MODIFIED;
case DELETED -> Event.Type.DELETED;
default -> throw new IllegalArgumentException("Unknown action type: " + action);
};
}

private static String getEnv(String key, String defaultValue) {
String s = System.getenv(key);
if (s == null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ public Scheduler(AutoScalerFactory autoScalerFactory, KubernetesClient k8sClient
}

public void onEvent(Event.Type type, AgentPool resource) {
log.info("onEvent -> handling {} for {}/{}", type, resource.getMetadata().getNamespace(), resource.getMetadata().getName());
synchronized (events) {
events.add(new Event(type, resource));
}
Expand Down Expand Up @@ -161,13 +162,13 @@ private AgentPoolInstance updateTargetSize(AgentPoolInstance i) throws IOExcepti
}

private void processActive(AgentPoolInstance i) throws IOException {
log.info("processActive ['{}']", i);
log.info("processActive ['{}']", i.getName());
List<Change> changes = planner.plan(i);
apply(changes);
}

private void processDeleted(AgentPoolInstance i) throws IOException {
log.info("processDeleted ['{}']", i);
log.info("processDeleted ['{}']", i.getName());
String resourceName = i.getName();

// remove all pool's pods
Expand Down

0 comments on commit 9320ea2

Please sign in to comment.