Skip to content

Commit

Permalink
Fixes for Pubber startup errors (#772)
Browse files Browse the repository at this point in the history
  • Loading branch information
grafnu authored Nov 21, 2023
1 parent 298c93c commit 9efc2dd
Show file tree
Hide file tree
Showing 11 changed files with 123 additions and 87 deletions.
2 changes: 2 additions & 0 deletions .github/workflows/testing.yml
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,8 @@ jobs:
java-version: '17'
- name: base setup
run: bin/run_tests install_dependencies
- name: stagger startup
run: sleep $(($MATRIX_SHARD_INDEX * 20 + 20))
- name: registrar clean
run: bin/test_regclean $TARGET_PROJECT
- name: sequence tests clean
Expand Down
4 changes: 4 additions & 0 deletions bin/test_validator
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,10 @@ fi
project_id=$1
shift

echo "export TARGET_PROJECT=$project_id"
echo "export UDMI_REGISTRY_SUFFIX=$UDMI_REGISTRY_SUFFIX"
echo "export UDMI_ALT_REGISTRY=$UDMI_ALT_REGISTRY"

[[ -n $GITHUB_RUN_NUMBER ]] && echo "Workflow run number $GITHUB_RUN_NUMBER" || true
echo 'Using target project:' $project_id

Expand Down
52 changes: 26 additions & 26 deletions etc/validator.out
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ sites/udmi_site_model/out/devices/AHU-22/event_pointset.json
sites/udmi_site_model/out/devices/AHU-22/event_system.json
sites/udmi_site_model/out/devices/AHU-22/persistent_data.json
sites/udmi_site_model/out/devices/AHU-22/state.json
sites/udmi_site_model/out/devices/AHU-22/state_error.json
sites/udmi_site_model/out/devices/AHU-22/state_localnet.json
sites/udmi_site_model/out/devices/AHU-22/state_pointset.json
sites/udmi_site_model/out/devices/AHU-22/state_system.json
Expand Down Expand Up @@ -325,9 +324,9 @@ sites/udmi_site_model/out/devices/AHU-22/state.out
"sub_folder" : "update",
"sub_type" : "state",
"status" : {
"message" : "Timestamp jitter REDACTED_DURATION (REDACTED_TIMESTAMP to REDACTED_TIMESTAMP) exceeds REDACTED_DURATION threshold",
"detail" : "state_update: Timestamp jitter REDACTED_DURATION (REDACTED_TIMESTAMP to REDACTED_TIMESTAMP) exceeds REDACTED_DURATION threshold REDACTED_ERROR",
"category" : "validation.device.schema",
"message" : "Multiple validation errors",
"detail" : "Timestamp jitter REDACTED_DURATION (REDACTED_TIMESTAMP to REDACTED_TIMESTAMP) exceeds REDACTED_DURATION threshold; Device has missing points: filter_alarm_pressure_status, filter_differential_pressure, filter_differential_pressure_sensor; Device has extra points: globulating_globar",
"category" : "validation.device.multiple",
"timestamp" : "REDACTED_TIMESTAMP",
"level" : 500
},
Expand All @@ -337,26 +336,15 @@ sites/udmi_site_model/out/devices/AHU-22/state.out
"category" : "validation.device.schema",
"timestamp" : "REDACTED_TIMESTAMP",
"level" : 500
} ]
}
::::::::::::::
sites/udmi_site_model/out/devices/AHU-22/state_error.out
::::::::::::::
{
"timestamp" : "REDACTED_TIMESTAMP",
"version" : "1.4.2",
"sub_folder" : "error",
"sub_type" : "state",
"status" : {
"message" : "Error in message pipeline: While parsing json object, Unexpected character ('!' (code 33)): expected a valid value (JSON String, Number (or 'NaN'/'INF'/'+INF'), Array, Object or token 'null', 'true' or 'false') at [Source: (String)\"!&*@(!*&@!\"; line: 1, column: 2]",
"detail" : "state_error: Error in message pipeline: While parsing json object, Unexpected character ('!' (code 33)): expected a valid value (JSON String, Number (or 'NaN'/'INF'/'+INF'), Array, Object or token 'null', 'true' or 'false') at [Source: (String)\"!&*REDACTED_ERROR",
}, {
"message" : "Device has missing points: filter_alarm_pressure_status, filter_differential_pressure, filter_differential_pressure_sensor",
"detail" : "state_update: Device has missing points: filter_alarm_pressure_status, filter_differential_pressure, filter_differential_pressure_sensor",
"category" : "validation.device.schema",
"timestamp" : "REDACTED_TIMESTAMP",
"level" : 500
},
"errors" : [ {
"message" : "Error in message pipeline: While parsing json object, Unexpected character ('!' (code 33)): expected a valid value (JSON String, Number (or 'NaN'/'INF'/'+INF'), Array, Object or token 'null', 'true' or 'false') at [Source: (String)\"!&*@(!*&@!\"; line: 1, column: 2]",
"detail" : "state_error: Error in message pipeline: While parsing json object, Unexpected character ('!' (code 33)): expected a valid value (JSON String, Number (or 'NaN'/'INF'/'+INF'), Array, Object or token 'null', 'true' or 'false') at [Source: (String)\"!&*REDACTED_ERROR",
}, {
"message" : "Device has extra points: globulating_globar",
"detail" : "state_update: Device has extra points: globulating_globar",
"category" : "validation.device.schema",
"timestamp" : "REDACTED_TIMESTAMP",
"level" : 500
Expand Down Expand Up @@ -394,22 +382,34 @@ sites/udmi_site_model/out/devices/AHU-22/state_pointset.out
"sub_folder" : "pointset",
"sub_type" : "state",
"status" : {
"message" : "Timestamp jitter REDACTED_DURATION (REDACTED_TIMESTAMP to REDACTED_TIMESTAMP) exceeds REDACTED_DURATION threshold",
"detail" : "state_pointset: Timestamp jitter REDACTED_DURATION (REDACTED_TIMESTAMP to REDACTED_TIMESTAMP) exceeds REDACTED_DURATION threshold REDACTED_ERROR",
"category" : "validation.device.schema",
"message" : "Multiple validation errors",
"detail" : "Timestamp jitter REDACTED_DURATION (REDACTED_TIMESTAMP to REDACTED_TIMESTAMP) exceeds REDACTED_DURATION threshold; Device has missing points: filter_alarm_pressure_status, filter_differential_pressure, filter_differential_pressure_sensor; Device has extra points: globulating_globar",
"category" : "validation.device.multiple",
"timestamp" : "REDACTED_TIMESTAMP",
"level" : 500
},
"pointset" : {
"missing" : [ ],
"extra" : [ ]
"missing" : [ "filter_alarm_pressure_status", "filter_differential_pressure", "filter_differential_pressure_sensor" ],
"extra" : [ "globulating_globar" ]
},
"errors" : [ {
"message" : "Timestamp jitter REDACTED_DURATION (REDACTED_TIMESTAMP to REDACTED_TIMESTAMP) exceeds REDACTED_DURATION threshold",
"detail" : "state_pointset: Timestamp jitter REDACTED_DURATION (REDACTED_TIMESTAMP to REDACTED_TIMESTAMP) exceeds REDACTED_DURATION threshold REDACTED_ERROR",
"category" : "validation.device.schema",
"timestamp" : "REDACTED_TIMESTAMP",
"level" : 500
}, {
"message" : "Device has missing points: filter_alarm_pressure_status, filter_differential_pressure, filter_differential_pressure_sensor",
"detail" : "state_pointset: Device has missing points: filter_alarm_pressure_status, filter_differential_pressure, filter_differential_pressure_sensor",
"category" : "validation.device.schema",
"timestamp" : "REDACTED_TIMESTAMP",
"level" : 500
}, {
"message" : "Device has extra points: globulating_globar",
"detail" : "state_pointset: Device has extra points: globulating_globar",
"category" : "validation.device.schema",
"timestamp" : "REDACTED_TIMESTAMP",
"level" : 500
} ]
}
::::::::::::::
Expand Down
28 changes: 17 additions & 11 deletions pubber/src/main/java/daq/pubber/ListPublisher.java
Original file line number Diff line number Diff line change
@@ -1,14 +1,21 @@
package daq.pubber;

import static com.google.udmi.util.GeneralUtils.ifNotNullThen;

import com.google.api.client.util.ArrayMap;
import com.google.udmi.util.JsonUtil;
import java.util.AbstractMap.SimpleEntry;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.Map;
import java.util.Map.Entry;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.function.BiConsumer;
import java.util.function.Consumer;
import udmi.schema.Config;
import udmi.schema.PubberConfiguration;
import udmi.schema.SystemConfig;

/**
* Publishes message to an in-memory list.
Expand All @@ -19,6 +26,7 @@ public class ListPublisher implements Publisher {
private final PubberConfiguration configuration;
private List<String> messages = new ArrayList<>();
private String usePrefix;
private final Map<String, Entry<Consumer<Object>, Class<Object>>> handlers = new HashMap<>();

public ListPublisher(PubberConfiguration configuration, Consumer<Exception> onError) {
this.configuration = configuration;
Expand Down Expand Up @@ -48,31 +56,29 @@ public void setDeviceTopicPrefix(String deviceId, String topicPrefix) {
@Override
public <T> void registerHandler(String deviceId, String topicSuffix,
Consumer<T> handler, Class<T> messageType) {

Consumer<Object> foo = (Consumer<Object>) handler;
Class<Object> clazz = (Class<Object>) messageType;
handlers.put(topicSuffix, new SimpleEntry<>(foo, clazz));
}

@Override
public void connect(String deviceId) {

public void connect(String deviceId, boolean clean) {
Consumer<Object> handler = handlers.get("config").getKey();
handler.accept(new Config());
}

@Override
public void publish(String deviceId, String topicSuffix, Object message, Runnable callback) {
String useTopic = usePrefix + "/" + topicSuffix;
messages.add(getMessageString(deviceId, useTopic, message));
publisherExecutor.submit(callback);
ifNotNullThen(callback, () -> publisherExecutor.submit(callback));
}

@Override
public boolean isActive() {
return false;
}

@Override
public void startupLatchWait(CountDownLatch configLatch, String message) {

}

@Override
public void close() {

Expand Down
9 changes: 2 additions & 7 deletions pubber/src/main/java/daq/pubber/MqttDevice.java
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
package daq.pubber;

import java.util.concurrent.CountDownLatch;
import java.util.function.Consumer;
import udmi.schema.PubberConfiguration;

Expand Down Expand Up @@ -43,11 +42,11 @@ public <T> void registerHandler(String topicSuffix, Consumer<T> handler, Class<T
}

public void connect() {
publisher.connect(deviceId);
publisher.connect(deviceId, true);
}

public void connect(String targetId) {
publisher.connect(targetId);
publisher.connect(targetId, false);
}

public void publish(String deviceId, String topicSuffix, Object message, Runnable callback) {
Expand All @@ -58,10 +57,6 @@ public boolean isActive() {
return publisher.isActive();
}

public void startupLatchWait(CountDownLatch configLatch, String message) {
publisher.startupLatchWait(configLatch, message);
}

public void close() {
publisher.close();
}
Expand Down
45 changes: 19 additions & 26 deletions pubber/src/main/java/daq/pubber/MqttPublisher.java
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import static com.google.common.base.Preconditions.checkNotNull;
import static com.google.udmi.util.GeneralUtils.ifTrueGet;
import static com.google.udmi.util.GeneralUtils.ifTrueThen;
import static com.google.udmi.util.GeneralUtils.isTrue;
import static java.lang.String.format;
import static java.util.Optional.ofNullable;
Expand Down Expand Up @@ -38,7 +39,6 @@
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.BiConsumer;
import java.util.function.Consumer;
import java.util.stream.Collectors;
import org.apache.http.ConnectionClosedException;
Expand Down Expand Up @@ -81,10 +81,11 @@ public class MqttPublisher implements Publisher {
private static final int TOKEN_EXPIRY_MINUTES = 60;
private static final int QOS_AT_MOST_ONCE = 0;
private static final int QOS_AT_LEAST_ONCE = 1;
private static final int DEFAULT_CONFIG_WAIT_SEC = 10;
static final int DEFAULT_CONFIG_WAIT_SEC = 10;
private static final String EVENT_MARK_PREFIX = "events/";
private static final Map<String, AtomicInteger> EVENT_SERIAL = new HashMap<>();
private static final String GCP_CLIENT_PREFIX = "projects/";
public static final String EMPTY_STRING = "";

private final Semaphore connectionLock = new Semaphore(1);

Expand Down Expand Up @@ -235,15 +236,17 @@ private String getSendTopic(String deviceId, String topicSuffix) {
}

private void closeMqttClient(String deviceId) {
MqttClient removed = cleanClients(deviceId);
if (removed != null) {
try {
if (removed.isConnected()) {
removed.disconnect();
synchronized (mqttClients) {
MqttClient removed = cleanClients(deviceId);
if (removed != null) {
try {
if (removed.isConnected()) {
removed.disconnect();
}
removed.close();
} catch (Exception e) {
error("Error closing MQTT client: " + e, null, "stop", e);
}
removed.close();
} catch (Exception e) {
error("Error closing MQTT client: " + e, null, "stop", e);
}
}
}
Expand Down Expand Up @@ -282,12 +285,11 @@ private MqttClient newBoundClient(String deviceId) {
try {
String gatewayId = getGatewayId(deviceId);
debug(format("Connecting device %s through gateway %s", deviceId, gatewayId));
MqttClient mqttClient = getConnectedClient(gatewayId);
final MqttClient mqttClient = getConnectedClient(gatewayId);
startupLatchWait(connectionLatch, "gateway startup exchange");
String topic = getMessageTopic(deviceId, MqttDevice.ATTACH_TOPIC);
String payload = "";
info("Publishing attach message " + topic);
mqttClient.publish(topic, payload.getBytes(StandardCharsets.UTF_8), QOS_AT_LEAST_ONCE,
mqttClient.publish(topic, EMPTY_STRING.getBytes(StandardCharsets.UTF_8), QOS_AT_LEAST_ONCE,
SHOULD_RETAIN);
subscribeToUpdates(mqttClient, deviceId);
return mqttClient;
Expand All @@ -296,8 +298,7 @@ private MqttClient newBoundClient(String deviceId) {
}
}

@Override
public void startupLatchWait(CountDownLatch gatewayLatch, String designator) {
private void startupLatchWait(CountDownLatch gatewayLatch, String designator) {
try {
int waitTimeSec = ofNullable(configuration.endpoint.config_sync_sec)
.orElse(DEFAULT_CONFIG_WAIT_SEC);
Expand Down Expand Up @@ -480,7 +481,8 @@ private String getDeviceId(String topic) {
return topic.split("/")[2];
}

public void connect(String targetId) {
public void connect(String targetId, boolean clean) {
ifTrueThen(clean, () -> closeMqttClient(targetId));
getConnectedClient(targetId);
}

Expand Down Expand Up @@ -542,13 +544,6 @@ private void checkAuthentication(String targetId) {
reauthTimes.remove(authId);
synchronized (mqttClients) {
MqttClient client = cleanClients(authId);
if (client == null) {
return;
}
Set<String> removeSet = mqttClients.entrySet().stream()
.filter(entry -> entry.getValue() == client).map(Entry::getKey)
.collect(Collectors.toSet());
removeSet.forEach(mqttClients::remove);
try {
client.disconnect();
client.close();
Expand Down Expand Up @@ -659,9 +654,7 @@ public void messageArrived(String topic, MqttMessage message) {
String messageType = getMessageType(topic);
String handlerKey = getHandlerKey(topic);
String deviceId = getDeviceId(topic);
if (getGatewayId(deviceId) == null) {
connectionLatch.countDown();
}
connectionLatch.countDown();
Consumer<Object> handler = handlers.get(handlerKey);
Class<Object> type = handlersType.get(handlerKey);
if (handler == null) {
Expand Down
Loading

0 comments on commit 9efc2dd

Please sign in to comment.