From 8cf20ba8eed13531ca7a67291dbbc7da53db23b2 Mon Sep 17 00:00:00 2001 From: Trevor Date: Fri, 26 Apr 2024 09:49:39 -0400 Subject: [PATCH] Fix cron distribution (#875) --- .../com/google/bos/udmi/service/core/CronProcessor.java | 3 ++- .../com/google/bos/udmi/service/core/DistributorPipe.java | 6 ++++++ .../bos/udmi/service/messaging/impl/SimpleMqttPipe.java | 4 ++++ 3 files changed, 12 insertions(+), 1 deletion(-) diff --git a/udmis/src/main/java/com/google/bos/udmi/service/core/CronProcessor.java b/udmis/src/main/java/com/google/bos/udmi/service/core/CronProcessor.java index b2d40f97ff..366cf4ade4 100644 --- a/udmis/src/main/java/com/google/bos/udmi/service/core/CronProcessor.java +++ b/udmis/src/main/java/com/google/bos/udmi/service/core/CronProcessor.java @@ -31,11 +31,12 @@ public class CronProcessor extends ProcessorBase { private static final String PAYLOAD_SEPARATOR = ":"; private static final String PATH_SEPARATOR = "/"; + private static final String ID_SEPARATOR = "~"; private static final long CUTOFF_INTERVALS = 3; private static final DefaultMustacheFactory MUSTACHE_FACTORY = new DefaultMustacheFactory(); private static final SortedMap TRACKER = new ConcurrentSkipListMap<>(); private static final String HEARTBEAT_NAME = "heartbeat"; - private static final String HEARTBEAT_SUFFIX = PATH_SEPARATOR + HEARTBEAT_NAME; + private static final String HEARTBEAT_SUFFIX = ID_SEPARATOR + HEARTBEAT_NAME; private static Integer HEARTBEAT_SEC; private final Envelope srcEnvelope; private final Mustache template; diff --git a/udmis/src/main/java/com/google/bos/udmi/service/core/DistributorPipe.java b/udmis/src/main/java/com/google/bos/udmi/service/core/DistributorPipe.java index a6f730d7e6..f2c6cb7df7 100644 --- a/udmis/src/main/java/com/google/bos/udmi/service/core/DistributorPipe.java +++ b/udmis/src/main/java/com/google/bos/udmi/service/core/DistributorPipe.java @@ -80,4 +80,10 @@ public void publish(Envelope rawEnvelope, Object message, String source) { public String getRouteId(String source) { return format("%s%s%s", clientId, ROUTE_SEPARATOR, source); } + + @Override + public void activate() { + super.activate(); + info("Distributing as client " + clientId); + } } diff --git a/udmis/src/main/java/com/google/bos/udmi/service/messaging/impl/SimpleMqttPipe.java b/udmis/src/main/java/com/google/bos/udmi/service/messaging/impl/SimpleMqttPipe.java index 8542899749..9e0115303f 100644 --- a/udmis/src/main/java/com/google/bos/udmi/service/messaging/impl/SimpleMqttPipe.java +++ b/udmis/src/main/java/com/google/bos/udmi/service/messaging/impl/SimpleMqttPipe.java @@ -2,10 +2,13 @@ import static com.google.common.base.Preconditions.checkNotNull; import static com.google.common.base.Preconditions.checkState; +import static com.google.udmi.util.Common.PUBLISH_TIME_KEY; import static com.google.udmi.util.GeneralUtils.friendlyStackTrace; import static com.google.udmi.util.GeneralUtils.ifNotNullThen; import static com.google.udmi.util.GeneralUtils.ifTrueThen; import static com.google.udmi.util.GeneralUtils.nullAsNull; +import static com.google.udmi.util.JsonUtil.getNowInstant; +import static com.google.udmi.util.JsonUtil.isoConvert; import static com.google.udmi.util.JsonUtil.toStringMap; import static java.lang.String.format; import static java.util.Optional.ofNullable; @@ -240,6 +243,7 @@ public void deliveryComplete(IMqttDeliveryToken token) { public void messageArrived(String topic, MqttMessage message) { try { Map envelopeMap = parseEnvelopeTopic(topic); + envelopeMap.put(PUBLISH_TIME_KEY, isoConvert()); receiveMessage(envelopeMap, new String(message.getPayload())); } catch (Exception e) { error("Exception receiving message on %s: %s", clientId, friendlyStackTrace(e));