Skip to content

Commit

Permalink
Fix cron distribution (#875)
Browse files Browse the repository at this point in the history
  • Loading branch information
grafnu authored Apr 26, 2024
1 parent a3915c1 commit 8cf20ba
Show file tree
Hide file tree
Showing 3 changed files with 12 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -31,11 +31,12 @@ public class CronProcessor extends ProcessorBase {

private static final String PAYLOAD_SEPARATOR = ":";
private static final String PATH_SEPARATOR = "/";
private static final String ID_SEPARATOR = "~";
private static final long CUTOFF_INTERVALS = 3;
private static final DefaultMustacheFactory MUSTACHE_FACTORY = new DefaultMustacheFactory();
private static final SortedMap<String, Instant> TRACKER = new ConcurrentSkipListMap<>();
private static final String HEARTBEAT_NAME = "heartbeat";
private static final String HEARTBEAT_SUFFIX = PATH_SEPARATOR + HEARTBEAT_NAME;
private static final String HEARTBEAT_SUFFIX = ID_SEPARATOR + HEARTBEAT_NAME;
private static Integer HEARTBEAT_SEC;
private final Envelope srcEnvelope;
private final Mustache template;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,4 +80,10 @@ public void publish(Envelope rawEnvelope, Object message, String source) {
public String getRouteId(String source) {
return format("%s%s%s", clientId, ROUTE_SEPARATOR, source);
}

@Override
public void activate() {
super.activate();
info("Distributing as client " + clientId);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,13 @@

import static com.google.common.base.Preconditions.checkNotNull;
import static com.google.common.base.Preconditions.checkState;
import static com.google.udmi.util.Common.PUBLISH_TIME_KEY;
import static com.google.udmi.util.GeneralUtils.friendlyStackTrace;
import static com.google.udmi.util.GeneralUtils.ifNotNullThen;
import static com.google.udmi.util.GeneralUtils.ifTrueThen;
import static com.google.udmi.util.GeneralUtils.nullAsNull;
import static com.google.udmi.util.JsonUtil.getNowInstant;
import static com.google.udmi.util.JsonUtil.isoConvert;
import static com.google.udmi.util.JsonUtil.toStringMap;
import static java.lang.String.format;
import static java.util.Optional.ofNullable;
Expand Down Expand Up @@ -240,6 +243,7 @@ public void deliveryComplete(IMqttDeliveryToken token) {
public void messageArrived(String topic, MqttMessage message) {
try {
Map<String, String> envelopeMap = parseEnvelopeTopic(topic);
envelopeMap.put(PUBLISH_TIME_KEY, isoConvert());
receiveMessage(envelopeMap, new String(message.getPayload()));
} catch (Exception e) {
error("Exception receiving message on %s: %s", clientId, friendlyStackTrace(e));
Expand Down

0 comments on commit 8cf20ba

Please sign in to comment.