From 48397c75b33a3e0bbb72fb10a176ec19c21e3560 Mon Sep 17 00:00:00 2001 From: Trevor Date: Tue, 13 Feb 2024 09:31:32 -0800 Subject: [PATCH] Add periodic monitor logging of message pipe receive and publish processing (#810) --- .gencode_hash.txt | 14 +- gencode/docs/configuration_endpoint.html | 34 +++ gencode/docs/configuration_execution.html | 82 ++++++ gencode/docs/configuration_pod.html | 247 ++++++++++++++++++ gencode/docs/configuration_pubber.html | 41 +++ gencode/docs/persistent_device.html | 41 +++ .../udmi/schema/EndpointConfiguration.java | 11 +- .../udmi/schema/configuration_endpoint.py | 4 + schema/configuration_endpoint.json | 4 + udmis/etc/prod_pod.json | 1 + .../udmi/service/messaging/MessagePipe.java | 6 + .../messaging/impl/FileMessagePipe.java | 2 +- .../messaging/impl/LocalMessagePipe.java | 2 +- .../service/messaging/impl/MessageBase.java | 114 ++++++-- .../messaging/impl/MessageDispatcherImpl.java | 160 +++++++----- .../service/messaging/impl/PubSubPipe.java | 19 +- .../messaging/impl/SimpleMqttPipe.java | 2 +- .../messaging/impl/TraceMessagePipe.java | 2 +- .../bos/udmi/service/pod/UdmiServicePod.java | 5 + 19 files changed, 674 insertions(+), 117 deletions(-) diff --git a/.gencode_hash.txt b/.gencode_hash.txt index 15553119aa..ceef3614a7 100644 --- a/.gencode_hash.txt +++ b/.gencode_hash.txt @@ -2,10 +2,10 @@ e5ae5dd058ce298448741a75a6ac67166a9f197b32061b9cbfd47655861864e2 gencode/docs/command_mapping.html 99876633b27acba744ac737a9682e01258752c5439effd07193317767a41a0c6 gencode/docs/config.html 6e853ce072dde0bccd7cdeb31845301ba681841d7cb21a8ff90d023bc9597729 gencode/docs/config_mapping.html -08583688b20f892c0b453f41787ac01a46ac601663736bcd6ed6f57be0758e79 gencode/docs/configuration_endpoint.html -6cf94d6cb600c75cde32a64bd78acb3ed3b54adfad08dbf6bb159b467e8925c9 gencode/docs/configuration_execution.html -b2e70451576eb40924c3dab83dea8b4c6bbec6e6fcc93734473f384de9a2621b gencode/docs/configuration_pod.html -dc77a21a05f9f98a23403a1db0c3b468ef50d028e4c4934734d08e15f36b1d57 gencode/docs/configuration_pubber.html +faccddced0e45b97877a20cdf9e6a7d9e24e6600914edece33b81beb2d31919f gencode/docs/configuration_endpoint.html +b4645ffef561c91cc28a06259535704a5f5b9cad4447ff4f4159507bd00bebae gencode/docs/configuration_execution.html +9e6b17051a30a6b98a105d630a29aee102e42aa9056fdbbb8a393f17c8fbdb24 gencode/docs/configuration_pod.html +8b79fa7ec1d18581d83d8b26d96944f02d4fdd858620c7df5ba9c54d593c19bc gencode/docs/configuration_pubber.html a5454f8dc6a843115823d1122ce34e36a5e9058cd3f6ea42c3482c8b4b5adf72 gencode/docs/event.html f7268ffd426cd03007e85a7f73347888c569f2ae92f10ec5569885d1a6c5e807 gencode/docs/event_discovery.html d0db0ed95c6e2aef32cd55749a13053db2809503e82b20ee04b98fd7f58c1e93 gencode/docs/event_mapping.html @@ -14,7 +14,7 @@ f6ca85990e940f109a48e35b25102d8860e5707a7b2145412046f44d599ef635 gencode/docs/e 4c15e549ded3e0489e1717e24a68a94b3995e43c2032ea7ca1c89deaa4f18c21 gencode/docs/event_validation.html f927607a6acca2dc77bc748287c7db207b348aaef4a77811b4e0724c738ba095 gencode/docs/metadata.html 5d0fa84dde29cbc20b6fe3a3c17819c711360cc8b6388a680c3995a7dcc08a5f gencode/docs/monitoring.html -180b32717db748e164a185b163ef9a97aa83d9d6add306283d5b9852d04af947 gencode/docs/persistent_device.html +8f303c9d6ea962651185d2a79ad90d8febc51b0c474b9eab38c91cb3bbe35cec gencode/docs/persistent_device.html 5d039d607af9ec75ee552dfe36b16c702687ea16f5663f41fc49b4533b86e00d gencode/docs/properties.html 1766f84518a315fe57e4a4bf934c0a386ad61d87091754a6bab097c686c16019 gencode/docs/readme.md 741b880216be3743f6747800a042f2dbd89f3b0344c6b0a965f4bc010f03a930 gencode/docs/schema_doc.css @@ -51,7 +51,7 @@ e809df42a73ed843977e447fa3333140dcdbe7d09b4e88bf1b8fd52fbaf9ae1a gencode/java/u 5b4508ce3ac0235c9ab97494aa0f2d2ccad88ebc83da28eca2c405fd906e84af gencode/java/udmi/schema/DiscoveryEvent.java 04112dd47b0f761131c276c67d3cd8b789d25e6716b5732be9fef14fc6831f1d gencode/java/udmi/schema/DiscoveryModel.java 0a11a539707571f79bd82b1958886cecae3209e2daef36dfca885adb4c61a07a gencode/java/udmi/schema/DiscoveryState.java -b01b7a05f21651a01a605466bb80b2e537059dcbb41f33734c851432da57f657 gencode/java/udmi/schema/EndpointConfiguration.java +8c972e1404ad801924c61723f40636e0d4a5a300e408af0d3ce5a96bea830d5b gencode/java/udmi/schema/EndpointConfiguration.java dc25e685886e11a741418be9191a478e13c0244647b5a0cac65d6c1e55055578 gencode/java/udmi/schema/Entry.java 06758aca1e0043ddf343b504030f47bb19260e99a82e2d66f12e86092a2434ca gencode/java/udmi/schema/Enumerate.java 8a51984458d96d1798d067005902defa57410167a27dcfb0b730ea38a3326011 gencode/java/udmi/schema/Envelope.java @@ -145,7 +145,7 @@ ac3facbd96f7cb2f7e387e7497d6a36af379a2687329571f250c5670f9933244 gencode/python 7da3bdb37f338260d5f3829fa5fcbb9bbf9f146b514a68319c314a96c6b8ac12 gencode/python/udmi/schema/config_system.py cce623b34fd694880039a1c080214c33e00acaef5bc72276cf11a3bb2de40000 gencode/python/udmi/schema/config_system_testing.py 30b1809e364cb3f7070002bb4a9954b11b25543b099b4bbe450d280001e4de55 gencode/python/udmi/schema/config_udmi.py -ae3f2f71fc81b1cffc45ffe9d1c179fba8bb09a398a305e7ba42aa8aeabe125a gencode/python/udmi/schema/configuration_endpoint.py +b7b9166569d7f2ee0a98afce697709deeda61b2152abf54bcc8d680170540ab2 gencode/python/udmi/schema/configuration_endpoint.py 14fd646b9a8638b87e4c421c9dadfb7ed2e66ad02b256217423e3b5dd6c39fd1 gencode/python/udmi/schema/configuration_execution.py e30f937983f98673b3e67ac1369fe86964d785092964f7e95cd39611f9283d7c gencode/python/udmi/schema/configuration_pod.py c61ee66daa7e632850bc2705370baa8c057e7a34792c6a29b9e4ca00c0eb195b gencode/python/udmi/schema/configuration_pod_base.py diff --git a/gencode/docs/configuration_endpoint.html b/gencode/docs/configuration_endpoint.html index 6a57b55b38..44dbd0c590 100644 --- a/gencode/docs/configuration_endpoint.html +++ b/gencode/docs/configuration_endpoint.html @@ -366,6 +366,40 @@

+ + + + +
+
+
+

+ +

+
+ +
+
+ + Type: integer
+

Period for monitoring sampling

+
+ + + + + +
diff --git a/gencode/docs/configuration_execution.html b/gencode/docs/configuration_execution.html index 3c96e201a9..9fca9cf3e3 100644 --- a/gencode/docs/configuration_execution.html +++ b/gencode/docs/configuration_execution.html @@ -869,6 +869,47 @@

+

+ + + +
+
+
+

+ +

+
+ +
+
+ + Type: integer
+

Period for monitoring sampling

+
+ + + + + +
@@ -1698,6 +1739,47 @@

+

+ + + +
+
+
+

+ +

+
+ +
+
+ + Type: integer
+

Period for monitoring sampling

+
+ + + + + +
diff --git a/gencode/docs/configuration_pod.html b/gencode/docs/configuration_pod.html index f7cccfd6b6..b68430732a 100644 --- a/gencode/docs/configuration_pod.html +++ b/gencode/docs/configuration_pod.html @@ -586,6 +586,47 @@

+

+ + + +
+
+
+

+ +

+
+ +
+
+ + Type: integer
+

Period for monitoring sampling

+
+ + + + + +
@@ -1526,6 +1567,54 @@

+

+ + + +
+
+
+

+ +

+
+ +
+
+ + Type: integer
+

Period for monitoring sampling

+
+ + + + + +
@@ -2689,6 +2778,61 @@

+

+ + + +
+
+
+

+ +

+
+ +
+
+ + Type: integer
+

Period for monitoring sampling

+
+ + + + + +
@@ -3784,6 +3928,61 @@

+

+ + + +
+
+
+

+ +

+
+ +
+
+ + Type: integer
+

Period for monitoring sampling

+
+ + + + + +
@@ -5117,6 +5316,54 @@

+

+ + + +
+
+
+

+ +

+
+ +
+
+ + Type: integer
+

Period for monitoring sampling

+
+ + + + + +
diff --git a/gencode/docs/configuration_pubber.html b/gencode/docs/configuration_pubber.html index 7236c91d73..a46d7fb12e 100644 --- a/gencode/docs/configuration_pubber.html +++ b/gencode/docs/configuration_pubber.html @@ -468,6 +468,47 @@

+

+ + + +
+
+
+

+ +

+
+ +
+
+ + Type: integer
+

Period for monitoring sampling

+
+ + + + + +
diff --git a/gencode/docs/persistent_device.html b/gencode/docs/persistent_device.html index c697b03360..810e9c9bf4 100644 --- a/gencode/docs/persistent_device.html +++ b/gencode/docs/persistent_device.html @@ -468,6 +468,47 @@

+

+ + + +
+
+
+

+ +

+
+ +
+
+ + Type: integer
+

Period for monitoring sampling

+
+ + + + + +
diff --git a/gencode/java/udmi/schema/EndpointConfiguration.java b/gencode/java/udmi/schema/EndpointConfiguration.java index c4de47f23e..96dc7ce766 100644 --- a/gencode/java/udmi/schema/EndpointConfiguration.java +++ b/gencode/java/udmi/schema/EndpointConfiguration.java @@ -31,6 +31,7 @@ "msg_prefix", "recv_id", "send_id", + "monitor_sec", "distributor", "auth_provider", "generation" @@ -98,6 +99,13 @@ public class EndpointConfiguration { @JsonProperty("send_id") @JsonPropertyDescription("Id for the sending messages channel") public String send_id; + /** + * Period for monitoring sampling + * + */ + @JsonProperty("monitor_sec") + @JsonPropertyDescription("Period for monitoring sampling") + public Integer monitor_sec; /** * processor designation for a distributor channel * @@ -119,6 +127,7 @@ public class EndpointConfiguration { public int hashCode() { int result = 1; result = ((result* 31)+((this.generation == null)? 0 :this.generation.hashCode())); + result = ((result* 31)+((this.monitor_sec == null)? 0 :this.monitor_sec.hashCode())); result = ((result* 31)+((this.transport == null)? 0 :this.transport.hashCode())); result = ((result* 31)+((this.error == null)? 0 :this.error.hashCode())); result = ((result* 31)+((this.config_sync_sec == null)? 0 :this.config_sync_sec.hashCode())); @@ -143,7 +152,7 @@ public boolean equals(Object other) { return false; } EndpointConfiguration rhs = ((EndpointConfiguration) other); - return ((((((((((((((this.generation == rhs.generation)||((this.generation!= null)&&this.generation.equals(rhs.generation)))&&((this.transport == rhs.transport)||((this.transport!= null)&&this.transport.equals(rhs.transport))))&&((this.error == rhs.error)||((this.error!= null)&&this.error.equals(rhs.error))))&&((this.config_sync_sec == rhs.config_sync_sec)||((this.config_sync_sec!= null)&&this.config_sync_sec.equals(rhs.config_sync_sec))))&&((this.distributor == rhs.distributor)||((this.distributor!= null)&&this.distributor.equals(rhs.distributor))))&&((this.client_id == rhs.client_id)||((this.client_id!= null)&&this.client_id.equals(rhs.client_id))))&&((this.msg_prefix == rhs.msg_prefix)||((this.msg_prefix!= null)&&this.msg_prefix.equals(rhs.msg_prefix))))&&((this.send_id == rhs.send_id)||((this.send_id!= null)&&this.send_id.equals(rhs.send_id))))&&((this.protocol == rhs.protocol)||((this.protocol!= null)&&this.protocol.equals(rhs.protocol))))&&((this.hostname == rhs.hostname)||((this.hostname!= null)&&this.hostname.equals(rhs.hostname))))&&((this.port == rhs.port)||((this.port!= null)&&this.port.equals(rhs.port))))&&((this.recv_id == rhs.recv_id)||((this.recv_id!= null)&&this.recv_id.equals(rhs.recv_id))))&&((this.auth_provider == rhs.auth_provider)||((this.auth_provider!= null)&&this.auth_provider.equals(rhs.auth_provider)))); + return (((((((((((((((this.generation == rhs.generation)||((this.generation!= null)&&this.generation.equals(rhs.generation)))&&((this.monitor_sec == rhs.monitor_sec)||((this.monitor_sec!= null)&&this.monitor_sec.equals(rhs.monitor_sec))))&&((this.transport == rhs.transport)||((this.transport!= null)&&this.transport.equals(rhs.transport))))&&((this.error == rhs.error)||((this.error!= null)&&this.error.equals(rhs.error))))&&((this.config_sync_sec == rhs.config_sync_sec)||((this.config_sync_sec!= null)&&this.config_sync_sec.equals(rhs.config_sync_sec))))&&((this.distributor == rhs.distributor)||((this.distributor!= null)&&this.distributor.equals(rhs.distributor))))&&((this.client_id == rhs.client_id)||((this.client_id!= null)&&this.client_id.equals(rhs.client_id))))&&((this.msg_prefix == rhs.msg_prefix)||((this.msg_prefix!= null)&&this.msg_prefix.equals(rhs.msg_prefix))))&&((this.send_id == rhs.send_id)||((this.send_id!= null)&&this.send_id.equals(rhs.send_id))))&&((this.protocol == rhs.protocol)||((this.protocol!= null)&&this.protocol.equals(rhs.protocol))))&&((this.hostname == rhs.hostname)||((this.hostname!= null)&&this.hostname.equals(rhs.hostname))))&&((this.port == rhs.port)||((this.port!= null)&&this.port.equals(rhs.port))))&&((this.recv_id == rhs.recv_id)||((this.recv_id!= null)&&this.recv_id.equals(rhs.recv_id))))&&((this.auth_provider == rhs.auth_provider)||((this.auth_provider!= null)&&this.auth_provider.equals(rhs.auth_provider)))); } @Generated("jsonschema2pojo") diff --git a/gencode/python/udmi/schema/configuration_endpoint.py b/gencode/python/udmi/schema/configuration_endpoint.py index 5c940de83c..888f09fd3c 100644 --- a/gencode/python/udmi/schema/configuration_endpoint.py +++ b/gencode/python/udmi/schema/configuration_endpoint.py @@ -56,6 +56,7 @@ def __init__(self): self.msg_prefix = None self.recv_id = None self.send_id = None + self.monitor_sec = None self.distributor = None self.auth_provider = None self.generation = None @@ -75,6 +76,7 @@ def from_dict(source): result.msg_prefix = source.get('msg_prefix') result.recv_id = source.get('recv_id') result.send_id = source.get('send_id') + result.monitor_sec = source.get('monitor_sec') result.distributor = source.get('distributor') result.auth_provider = ObjectA90DCC28.from_dict(source.get('auth_provider')) result.generation = source.get('generation') @@ -118,6 +120,8 @@ def to_dict(self): result['recv_id'] = self.recv_id # 5 if self.send_id: result['send_id'] = self.send_id # 5 + if self.monitor_sec: + result['monitor_sec'] = self.monitor_sec # 5 if self.distributor: result['distributor'] = self.distributor # 5 if self.auth_provider: diff --git a/schema/configuration_endpoint.json b/schema/configuration_endpoint.json index 5e93a430b6..2d072d3d30 100644 --- a/schema/configuration_endpoint.json +++ b/schema/configuration_endpoint.json @@ -55,6 +55,10 @@ "type": "string", "pattern": "^[-_/a-zA-Z0-9#]+$" }, + "monitor_sec": { + "description": "Period for monitoring sampling", + "type": "integer" + }, "distributor": { "description": "processor designation for a distributor channel", "type": "string", diff --git a/udmis/etc/prod_pod.json b/udmis/etc/prod_pod.json index 9d60ffeae9..2be6db3d10 100644 --- a/udmis/etc/prod_pod.json +++ b/udmis/etc/prod_pod.json @@ -6,6 +6,7 @@ "flow_defaults": { "protocol": "pubsub", "hostname": "${GCP_PROJECT}", + "monitor_sec": 60, "distributor": "stately" }, "flows": { diff --git a/udmis/src/main/java/com/google/bos/udmi/service/messaging/MessagePipe.java b/udmis/src/main/java/com/google/bos/udmi/service/messaging/MessagePipe.java index e186249c8f..ad27c2f570 100644 --- a/udmis/src/main/java/com/google/bos/udmi/service/messaging/MessagePipe.java +++ b/udmis/src/main/java/com/google/bos/udmi/service/messaging/MessagePipe.java @@ -10,6 +10,7 @@ import com.google.bos.udmi.service.messaging.impl.TraceMessagePipe; import com.google.common.collect.ImmutableMap; import java.util.Map; +import java.util.Map.Entry; import java.util.function.Consumer; import java.util.function.Function; import udmi.schema.EndpointConfiguration; @@ -62,4 +63,9 @@ static MessagePipe from(EndpointConfiguration config) { * Shutdown an active pipe so that it no longer processes received messages. */ void shutdown(); + + /** + * Atomically extract a count/sum pair of message publish durations. + */ + Map> extractStats(); } \ No newline at end of file diff --git a/udmis/src/main/java/com/google/bos/udmi/service/messaging/impl/FileMessagePipe.java b/udmis/src/main/java/com/google/bos/udmi/service/messaging/impl/FileMessagePipe.java index 2f81d5511d..3afbd62333 100644 --- a/udmis/src/main/java/com/google/bos/udmi/service/messaging/impl/FileMessagePipe.java +++ b/udmis/src/main/java/com/google/bos/udmi/service/messaging/impl/FileMessagePipe.java @@ -96,7 +96,7 @@ private void fileOutHandler(String sendId) { } @Override - public void publish(Bundle bundle) { + protected void publishRaw(Bundle bundle) { if (outFileRoot == null) { throw new IllegalStateException("trace out file not defined, no send_id"); } diff --git a/udmis/src/main/java/com/google/bos/udmi/service/messaging/impl/LocalMessagePipe.java b/udmis/src/main/java/com/google/bos/udmi/service/messaging/impl/LocalMessagePipe.java index de3d9766f5..cbabb6fe31 100644 --- a/udmis/src/main/java/com/google/bos/udmi/service/messaging/impl/LocalMessagePipe.java +++ b/udmis/src/main/java/com/google/bos/udmi/service/messaging/impl/LocalMessagePipe.java @@ -64,7 +64,7 @@ private Function> trackedQueue(String name) { /** * Publish a message bundle to this pipe. Simply pushes it into the outgoing queue! */ - public void publish(Bundle bundle) { + protected void publishRaw(Bundle bundle) { try { debug("Publishing bundle to %s", this); pushQueueEntry(destinationQueue, stringify(bundle)); diff --git a/udmis/src/main/java/com/google/bos/udmi/service/messaging/impl/MessageBase.java b/udmis/src/main/java/com/google/bos/udmi/service/messaging/impl/MessageBase.java index 767023657e..864054cb87 100644 --- a/udmis/src/main/java/com/google/bos/udmi/service/messaging/impl/MessageBase.java +++ b/udmis/src/main/java/com/google/bos/udmi/service/messaging/impl/MessageBase.java @@ -22,19 +22,25 @@ import com.google.bos.udmi.service.pod.ContainerBase; import com.google.bos.udmi.service.pod.UdmiServicePod; import com.google.common.base.Strings; +import com.google.common.collect.ImmutableMap; +import com.google.common.util.concurrent.AtomicDouble; import java.time.Duration; import java.time.Instant; +import java.util.AbstractMap.SimpleEntry; import java.util.Date; import java.util.HashMap; import java.util.HashSet; import java.util.Map; +import java.util.Map.Entry; import java.util.Objects; +import java.util.Optional; import java.util.Set; import java.util.concurrent.BlockingQueue; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.LinkedBlockingDeque; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; import java.util.function.Consumer; import org.jetbrains.annotations.Nullable; import org.jetbrains.annotations.TestOnly; @@ -52,16 +58,30 @@ public abstract class MessageBase extends ContainerBase implements MessagePipe { public static final String INVALID_ENVELOPE_KEY = "invalid"; public static final int EXECUTION_THREADS = 4; public static final String ERROR_MESSAGE_MARKER = "error-mark"; + public static final String PUBLISH_STATS = "publish"; + public static final String RECEIVE_STATS = "receive"; static final String TERMINATE_MARKER = "terminate"; private static final String DEFAULT_NAMESPACE = "default-namespace"; private static final Set HANDLED_QUEUES = new HashSet<>(); private static final long DEFAULT_POLL_TIME_SEC = 1; private static final long AWAIT_TERMINATION_SEC = 10; private final ExecutorService executor = Executors.newFixedThreadPool(EXECUTION_THREADS); + private final Entry publishStats = makeEmptyStats(); + private final Entry receiveStats = makeEmptyStats(); + private final String pipeId; private BlockingQueue sourceQueue; private Consumer dispatcher; private boolean activated; + public MessageBase() { + pipeId = getClass().getSimpleName(); + } + + public MessageBase(EndpointConfiguration configuration) { + pipeId = Optional.ofNullable(configuration.error).map(flow -> "flow:" + flow) + .orElse(getClass().getSimpleName()); + } + /** * Combine two message configurations together (for applying defaults). */ @@ -99,6 +119,8 @@ protected Bundle makeHelloBundle() { return bundle; } + protected abstract void publishRaw(Bundle bundle); + protected void pushQueueEntry(BlockingQueue queue, String stringBundle) { try { requireNonNull(stringBundle, "missing queue bundle"); @@ -118,32 +140,11 @@ protected void receiveMessage(Map envelopeMap, Map message protected void receiveMessage(Map attributesMap, String messageString) { grabExecutionContext(); - - final Object messageObject; - try { - messageObject = parseJson(messageString); - } catch (Exception e) { - receiveException(attributesMap, messageString, e, SubFolder.ERROR); - return; - } - final Envelope envelope; - - try { - sanitizeAttributeMap(attributesMap); - envelope = convertTo(Envelope.class, attributesMap); - } catch (Exception e) { - attributesMap.put(INVALID_ENVELOPE_KEY, "true"); - receiveException(attributesMap, messageString, e, null); - return; - } - + final Instant start = Instant.now(); try { - Bundle bundle = new Bundle(envelope, messageObject); - debug("Received %s/%s -> %s %s", bundle.envelope.subType, bundle.envelope.subFolder, - queueIdentifier(), bundle.envelope.transactionId); - receiveBundle(bundle); - } catch (Exception e) { - receiveException(attributesMap, messageString, e, null); + receiveMessageRaw(attributesMap, messageString); + } finally { + accumulateStats(receiveStats, Duration.between(start, Instant.now())); } } @@ -162,12 +163,23 @@ protected void terminateHandlers() { } } + private synchronized void accumulateStats(Entry stats, + Duration duration) { + double seconds = duration.getSeconds() + duration.toMillisPart() / 1000.0; + stats.getKey().incrementAndGet(); + stats.getValue().addAndGet(seconds); + } + private synchronized void ensureSourceQueue() { if (sourceQueue == null) { sourceQueue = new LinkedBlockingDeque<>(); } } + private Entry extractStat(Entry stats) { + return new SimpleEntry<>(stats.getKey().getAndSet(0), stats.getValue().getAndSet(0)); + } + @Nullable private String getFromSourceQueue() throws InterruptedException { QueueEntry poll = sourceQueue.poll(DEFAULT_POLL_TIME_SEC, TimeUnit.SECONDS); @@ -196,6 +208,10 @@ private void handleQueue() { } } + private Entry makeEmptyStats() { + return new SimpleEntry<>(new AtomicInteger(), new AtomicDouble()); + } + private void messageLoop(String id) { info("Starting message loop %s", id); while (true) { @@ -255,6 +271,35 @@ private void receiveException(Map attributesMap, String messageS receiveBundle(stringify(bundle)); } + private void receiveMessageRaw(Map attributesMap, String messageString) { + final Object messageObject; + try { + messageObject = parseJson(messageString); + } catch (Exception e) { + receiveException(attributesMap, messageString, e, SubFolder.ERROR); + return; + } + final Envelope envelope; + + try { + sanitizeAttributeMap(attributesMap); + envelope = convertTo(Envelope.class, attributesMap); + } catch (Exception e) { + attributesMap.put(INVALID_ENVELOPE_KEY, "true"); + receiveException(attributesMap, messageString, e, null); + return; + } + + try { + Bundle bundle = new Bundle(envelope, messageObject); + debug("Received %s/%s -> %s %s", bundle.envelope.subType, bundle.envelope.subFolder, + queueIdentifier(), bundle.envelope.transactionId); + receiveBundle(bundle); + } catch (Exception e) { + receiveException(attributesMap, messageString, e, null); + } + } + private void sanitizeAttributeMap(Map attributesMap) { String subFolderRaw = attributesMap.get(SUBFOLDER_PROPERTY_KEY); if (subFolderRaw == null) { @@ -314,6 +359,13 @@ public void awaitShutdown() { } } + @Override + public synchronized Map> extractStats() { + return ImmutableMap.of( + PUBLISH_STATS, extractStat(publishStats), + RECEIVE_STATS, extractStat(receiveStats)); + } + @Override public boolean isActive() { return activated; @@ -334,7 +386,15 @@ public Bundle poll() { } } - public abstract void publish(Bundle bundle); + @Override + public final void publish(Bundle bundle) { + Instant start = Instant.now(); + try { + publishRaw(bundle); + } finally { + accumulateStats(publishStats, Duration.between(start, Instant.now())); + } + } @Override public void shutdown() { @@ -355,7 +415,7 @@ public void terminate() { @Override public String toString() { - return format("MessagePipe %s => %s", queueIdentifier(), Objects.hash(dispatcher)); + return format("%s %s => %s", pipeId, queueIdentifier(), Objects.hash(dispatcher)); } /** diff --git a/udmis/src/main/java/com/google/bos/udmi/service/messaging/impl/MessageDispatcherImpl.java b/udmis/src/main/java/com/google/bos/udmi/service/messaging/impl/MessageDispatcherImpl.java index ca80ffa8c7..380bda7a3a 100644 --- a/udmis/src/main/java/com/google/bos/udmi/service/messaging/impl/MessageDispatcherImpl.java +++ b/udmis/src/main/java/com/google/bos/udmi/service/messaging/impl/MessageDispatcherImpl.java @@ -1,5 +1,7 @@ package com.google.bos.udmi.service.messaging.impl; +import static com.google.bos.udmi.service.messaging.impl.MessageBase.PUBLISH_STATS; +import static com.google.bos.udmi.service.messaging.impl.MessageBase.RECEIVE_STATS; import static com.google.common.base.Preconditions.checkState; import static com.google.udmi.util.GeneralUtils.deepCopy; import static com.google.udmi.util.JsonUtil.convertToStrict; @@ -7,6 +9,7 @@ import static com.google.udmi.util.JsonUtil.toMap; import static java.lang.String.format; import static java.util.Objects.requireNonNull; +import static java.util.Optional.ofNullable; import com.google.bos.udmi.service.messaging.ConfigUpdate; import com.google.bos.udmi.service.messaging.MessageContinuation; @@ -28,9 +31,12 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Map.Entry; import java.util.Objects; -import java.util.Optional; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import java.util.function.Consumer; import org.jetbrains.annotations.NotNull; @@ -71,6 +77,9 @@ public class MessageDispatcherImpl extends ContainerBase implements MessageDispa private final Map, AtomicInteger> handlerCounts = new ConcurrentHashMap<>(); private final String projectId; private final ThreadLocal threadEnvelope = new ThreadLocal<>(); + private final int monitorSec; + private final ScheduledExecutorService monitorExecutor = + Executors.newSingleThreadScheduledExecutor(); /** * Create a new instance of the message dispatcher. @@ -78,6 +87,7 @@ public class MessageDispatcherImpl extends ContainerBase implements MessageDispa public MessageDispatcherImpl(EndpointConfiguration configuration) { messagePipe = MessagePipe.from(configuration); projectId = variableSubstitution(configuration.hostname, "project_id/hostname not defined"); + monitorSec = ofNullable(configuration.monitor_sec).orElse(0); } @Nullable @@ -90,7 +100,7 @@ private static Object convertStrictOrObject(Class handlerType, Object message } private static String getMapKey(SubType subType, SubFolder subFolder) { - SubType useType = Optional.ofNullable(subType).orElse(SubType.EVENT); + SubType useType = ofNullable(subType).orElse(SubType.EVENT); return format("%s/%s", useType, subFolder); } @@ -132,31 +142,44 @@ private static void registerMessageClass(SubType type, SubFolder folder, Class handlerType, Object messageObject) { + handlers.get(handlerType).accept(messageObject); + synchronized (handlerCounts) { + handlerCounts.computeIfAbsent(handlerType, key -> new AtomicInteger()).incrementAndGet(); + handlerCounts.notify(); + } + } + + private void extractAndLog(Map> countSum, String key) { + Entry stats = countSum.get(key); + double rate = stats.getKey() / (double) monitorSec; + double average = stats.getValue() / stats.getKey(); + debug("Pipe %s %s count %.3f/s latency %.03fs", messagePipe, key, rate, average); + } + + private Envelope getThreadEnvelope() { + return requireNonNull(threadEnvelope.get(), "thread envelope not defined"); + } + /** - * Execute the runnable with the envelope mapped for the message. + * Set the message envelope to use for published messages from the current thread. */ - @VisibleForTesting - public void withEnvelopeFor(Envelope envelope, Object message, Runnable run) { - try { - messageEnvelopes.put(message, envelope); - setThreadEnvelope(envelope); - run.run(); - } finally { - messageEnvelopes.remove(message); - setThreadEnvelope(null); + public void setThreadEnvelope(Envelope envelope) { + Envelope previous = threadEnvelope.get(); + threadEnvelope.set(envelope); + if (previous != null && envelope != null) { + throw new RuntimeException("Overwriting existing thread envelope"); } } - private void processHandler(Envelope envelope, Class handlerType, Object messageObject) { - withEnvelopeFor(envelope, messageObject, () -> executeHandler(handlerType, messageObject)); + private void periodicMonitor() { + Map> countSum = messagePipe.extractStats(); + extractAndLog(countSum, RECEIVE_STATS); + extractAndLog(countSum, PUBLISH_STATS); } - private void executeHandler(Class handlerType, Object messageObject) { - handlers.get(handlerType).accept(messageObject); - synchronized (handlerCounts) { - handlerCounts.computeIfAbsent(handlerType, key -> new AtomicInteger()).incrementAndGet(); - handlerCounts.notify(); - } + private void processHandler(Envelope envelope, Class handlerType, Object messageObject) { + withEnvelopeFor(envelope, messageObject, () -> executeHandler(handlerType, messageObject)); } /** @@ -191,6 +214,10 @@ public void activate() { Consumer processMessage = this::processMessage; info(format("%s activating %s with %08x", this, messagePipe, Objects.hash(processMessage))); messagePipe.activate(processMessage); + if (monitorSec > 0) { + monitorExecutor.scheduleAtFixedRate(this::periodicMonitor, monitorSec, monitorSec, + TimeUnit.SECONDS); + } } @TestOnly @@ -217,48 +244,6 @@ public List drain() { } } - /** - * Set the message envelope to use for published messages from the current thread. - */ - public void setThreadEnvelope(Envelope envelope) { - Envelope previous = threadEnvelope.get(); - threadEnvelope.set(envelope); - if (previous != null && envelope != null) { - throw new RuntimeException("Overwriting existing thread envelope"); - } - } - - /** - * Wait for a message of the given handler type to be processed. Primarily for testing. - */ - public void waitForMessageProcessed(Class clazz) { - synchronized (handlerCounts) { - try { - Instant endTime = Instant.now().plusMillis(HANDLER_TIMEOUT_MS); - do { - handlerCounts.wait(HANDLER_TIMEOUT_MS); - } while (getHandlerCount(clazz) == 0 && Instant.now().isBefore(endTime)); - } catch (InterruptedException e) { - throw new RuntimeException("While waiting for handler count update", e); - } - } - } - - @Override - public MessageContinuation withEnvelope(Envelope envelope) { - return new MessageContinuation() { - @Override - public Envelope getEnvelope() { - return envelope; - } - - @Override - public void publish(Object message) { - publishBundle(makeMessageBundle(envelope, message)); - } - }; - } - @Override public MessageContinuation getContinuation(Object message) { Envelope messageEnvelope = messageEnvelopes.get(message); @@ -283,10 +268,6 @@ public void publish(Object message) { }; } - private Envelope getThreadEnvelope() { - return requireNonNull(threadEnvelope.get(), "thread envelope not defined"); - } - @Override public int getHandlerCount(Class clazz) { return handlerCounts.computeIfAbsent(clazz, key -> new AtomicInteger()).get(); @@ -356,6 +337,7 @@ public void resetForTest() { @Override public void shutdown() { messagePipe.shutdown(); + monitorExecutor.shutdown(); } public void terminate() { @@ -367,4 +349,50 @@ public String toString() { return format("Dispatcher %08x", Objects.hash(this)); } + /** + * Wait for a message of the given handler type to be processed. Primarily for testing. + */ + public void waitForMessageProcessed(Class clazz) { + synchronized (handlerCounts) { + try { + Instant endTime = Instant.now().plusMillis(HANDLER_TIMEOUT_MS); + do { + handlerCounts.wait(HANDLER_TIMEOUT_MS); + } while (getHandlerCount(clazz) == 0 && Instant.now().isBefore(endTime)); + } catch (InterruptedException e) { + throw new RuntimeException("While waiting for handler count update", e); + } + } + } + + @Override + public MessageContinuation withEnvelope(Envelope envelope) { + return new MessageContinuation() { + @Override + public Envelope getEnvelope() { + return envelope; + } + + @Override + public void publish(Object message) { + publishBundle(makeMessageBundle(envelope, message)); + } + }; + } + + /** + * Execute the runnable with the envelope mapped for the message. + */ + @VisibleForTesting + public void withEnvelopeFor(Envelope envelope, Object message, Runnable run) { + try { + messageEnvelopes.put(message, envelope); + setThreadEnvelope(envelope); + run.run(); + } finally { + messageEnvelopes.remove(message); + setThreadEnvelope(null); + } + } + } \ No newline at end of file diff --git a/udmis/src/main/java/com/google/bos/udmi/service/messaging/impl/PubSubPipe.java b/udmis/src/main/java/com/google/bos/udmi/service/messaging/impl/PubSubPipe.java index 3bd4fecb56..d42530a9f3 100644 --- a/udmis/src/main/java/com/google/bos/udmi/service/messaging/impl/PubSubPipe.java +++ b/udmis/src/main/java/com/google/bos/udmi/service/messaging/impl/PubSubPipe.java @@ -2,7 +2,6 @@ import static com.google.udmi.util.Common.SUBFOLDER_PROPERTY_KEY; import static com.google.udmi.util.Common.SUBTYPE_PROPERTY_KEY; -import static com.google.udmi.util.GeneralUtils.CSV_JOINER; import static com.google.udmi.util.GeneralUtils.friendlyStackTrace; import static com.google.udmi.util.GeneralUtils.ifNotNullGet; import static com.google.udmi.util.GeneralUtils.ifNotNullThen; @@ -60,21 +59,24 @@ public class PubSubPipe extends MessageBase implements MessageReceiver { private final List subscribers; private final Publisher publisher; private final String projectId; + private final String topicId; /** * Create a new instance based off the configuration. */ public PubSubPipe(EndpointConfiguration configuration) { + super(configuration); try { projectId = variableSubstitution(configuration.hostname, "no project id defined in configuration as 'hostname'"); - publisher = ifNotNullGet(variableSubstitution(configuration.send_id), this::getPublisher); + topicId = variableSubstitution(configuration.send_id); + publisher = ifNotNullGet(topicId, this::getPublisher); ifNotNullThen(publisher, this::checkPublisher); subscribers = ifNotNullGet(multiSubstitution(configuration.recv_id), this::getSubscribers); String subscriptionNames = subscribers.stream().map(Subscriber::getSubscriptionNameString) .collect(Collectors.joining(", ")); String topicName = ifNotNullGet(publisher, Publisher::getTopicNameString); - debug("PubSub %s -> %s", super.toString(), subscriptionNames, topicName); + debug("PubSub %s %s -> %s", super.toString(), subscriptionNames, topicName); } catch (Exception e) { throw new RuntimeException("While creating PubSub pipe", e); } @@ -130,7 +132,7 @@ public void activate(Consumer bundleConsumer) { } @Override - public void publish(Bundle bundle) { + protected void publishRaw(Bundle bundle) { if (publisher == null) { trace("Dropping message because publisher is null"); return; @@ -147,8 +149,7 @@ public void publish(Bundle bundle) { ApiFuture publish = publisher.publish(message); String publishedId = publish.get(); debug(format("Published PubSub %s/%s to %s as %s", stringMap.get(SUBTYPE_PROPERTY_KEY), - stringMap.get(SUBFOLDER_PROPERTY_KEY), publisher.getTopicNameString(), - PS_TXN_PREFIX + publishedId)); + stringMap.get(SUBFOLDER_PROPERTY_KEY), topicId, PS_TXN_PREFIX + publishedId)); } catch (Exception e) { throw new RuntimeException("While publishing bundle to " + publisher.getTopicNameString(), e); } @@ -156,7 +157,6 @@ public void publish(Bundle bundle) { @Override public void receiveMessage(PubsubMessage message, AckReplyConsumer reply) { - final Instant start = Instant.now(); Map attributesMap = new HashMap<>(message.getAttributesMap()); // Ack first to prevent a recurring loop of processing a faulty message. reply.ack(); @@ -165,11 +165,6 @@ public void receiveMessage(PubsubMessage message, AckReplyConsumer reply) { key -> isoConvert(ofEpochSecond(message.getPublishTime().getSeconds()))); attributesMap.computeIfAbsent(Common.TRANSACTION_KEY, key -> PS_TXN_PREFIX + messageId); receiveMessage(attributesMap, message.getData().toStringUtf8()); - Instant end = Instant.now(); - long seconds = Duration.between(start, end).getSeconds(); - if (seconds > 1) { - warn("Receive message took %ss", seconds); - } } private void stopAndWait(Subscriber subscriber) { diff --git a/udmis/src/main/java/com/google/bos/udmi/service/messaging/impl/SimpleMqttPipe.java b/udmis/src/main/java/com/google/bos/udmi/service/messaging/impl/SimpleMqttPipe.java index 16f8efe553..f4305c2998 100644 --- a/udmis/src/main/java/com/google/bos/udmi/service/messaging/impl/SimpleMqttPipe.java +++ b/udmis/src/main/java/com/google/bos/udmi/service/messaging/impl/SimpleMqttPipe.java @@ -173,7 +173,7 @@ public void shutdown() { } @Override - public void publish(Bundle bundle) { + protected void publishRaw(Bundle bundle) { try { mqttClient.publish(makeMqttTopic(bundle), makeMqttMessage(bundle)); } catch (Exception e) { diff --git a/udmis/src/main/java/com/google/bos/udmi/service/messaging/impl/TraceMessagePipe.java b/udmis/src/main/java/com/google/bos/udmi/service/messaging/impl/TraceMessagePipe.java index 622a14fe27..50135043bb 100644 --- a/udmis/src/main/java/com/google/bos/udmi/service/messaging/impl/TraceMessagePipe.java +++ b/udmis/src/main/java/com/google/bos/udmi/service/messaging/impl/TraceMessagePipe.java @@ -86,7 +86,7 @@ private void traceOutHandler(String sendId) { } @Override - public void publish(Bundle bundle) { + protected void publishRaw(Bundle bundle) { if (traceOutFile == null) { throw new IllegalStateException("trace out file not defined, no send_id"); } diff --git a/udmis/src/main/java/com/google/bos/udmi/service/pod/UdmiServicePod.java b/udmis/src/main/java/com/google/bos/udmi/service/pod/UdmiServicePod.java index 796203cf4a..0c98dd3f3a 100644 --- a/udmis/src/main/java/com/google/bos/udmi/service/pod/UdmiServicePod.java +++ b/udmis/src/main/java/com/google/bos/udmi/service/pod/UdmiServicePod.java @@ -18,6 +18,7 @@ import com.google.bos.udmi.service.core.ReflectProcessor; import com.google.bos.udmi.service.core.StateProcessor; import com.google.bos.udmi.service.core.TargetProcessor; +import com.google.common.base.Preconditions; import com.google.common.collect.ImmutableSet; import java.io.File; import java.util.Map; @@ -171,12 +172,16 @@ private void createBridge(String name, BridgePodConfiguration config) { } private void createDistributor(String name, EndpointConfiguration config) { + checkState(config.error == null, "config error/name already set"); + config.error = name; putComponent(name, () -> DistributorPipe.from(config)); } private void createFlow(String name, EndpointConfiguration config) { checkState(PROCESSORS.containsKey(name), "unknown flow key " + name); Class clazz = PROCESSORS.get(name); + Preconditions.checkState(config.error == null, "config error/name already set"); + config.error = name; putComponent(name, () -> ProcessorBase.create(clazz, makeConfig(config))); }