From 9efc2dd638e5124247fac5cbd951da907b1c4828 Mon Sep 17 00:00:00 2001 From: Trevor Date: Tue, 21 Nov 2023 07:00:06 -0400 Subject: [PATCH] Fixes for Pubber startup errors (#772) --- .github/workflows/testing.yml | 2 + bin/test_validator | 4 ++ etc/validator.out | 52 +++++++++---------- .../main/java/daq/pubber/ListPublisher.java | 28 ++++++---- .../src/main/java/daq/pubber/MqttDevice.java | 9 +--- .../main/java/daq/pubber/MqttPublisher.java | 45 +++++++--------- pubber/src/main/java/daq/pubber/Pubber.java | 52 ++++++++++++++++--- .../src/main/java/daq/pubber/Publisher.java | 6 +-- .../main/java/daq/pubber/SystemManager.java | 3 +- udmis/udmis.iml | 5 +- .../daq/mqtt/util/ObjectDiffEngine.java | 4 +- 11 files changed, 123 insertions(+), 87 deletions(-) diff --git a/.github/workflows/testing.yml b/.github/workflows/testing.yml index 68db80b564..7ffe4ea1e7 100644 --- a/.github/workflows/testing.yml +++ b/.github/workflows/testing.yml @@ -149,6 +149,8 @@ jobs: java-version: '17' - name: base setup run: bin/run_tests install_dependencies + - name: stagger startup + run: sleep $(($MATRIX_SHARD_INDEX * 20 + 20)) - name: registrar clean run: bin/test_regclean $TARGET_PROJECT - name: sequence tests clean diff --git a/bin/test_validator b/bin/test_validator index f9c91d2de7..5a8a9ea009 100755 --- a/bin/test_validator +++ b/bin/test_validator @@ -13,6 +13,10 @@ fi project_id=$1 shift +echo "export TARGET_PROJECT=$project_id" +echo "export UDMI_REGISTRY_SUFFIX=$UDMI_REGISTRY_SUFFIX" +echo "export UDMI_ALT_REGISTRY=$UDMI_ALT_REGISTRY" + [[ -n $GITHUB_RUN_NUMBER ]] && echo "Workflow run number $GITHUB_RUN_NUMBER" || true echo 'Using target project:' $project_id diff --git a/etc/validator.out b/etc/validator.out index 3b01b91cd6..0d945e8acd 100644 --- a/etc/validator.out +++ b/etc/validator.out @@ -15,7 +15,6 @@ sites/udmi_site_model/out/devices/AHU-22/event_pointset.json sites/udmi_site_model/out/devices/AHU-22/event_system.json sites/udmi_site_model/out/devices/AHU-22/persistent_data.json sites/udmi_site_model/out/devices/AHU-22/state.json -sites/udmi_site_model/out/devices/AHU-22/state_error.json sites/udmi_site_model/out/devices/AHU-22/state_localnet.json sites/udmi_site_model/out/devices/AHU-22/state_pointset.json sites/udmi_site_model/out/devices/AHU-22/state_system.json @@ -325,9 +324,9 @@ sites/udmi_site_model/out/devices/AHU-22/state.out "sub_folder" : "update", "sub_type" : "state", "status" : { - "message" : "Timestamp jitter REDACTED_DURATION (REDACTED_TIMESTAMP to REDACTED_TIMESTAMP) exceeds REDACTED_DURATION threshold", - "detail" : "state_update: Timestamp jitter REDACTED_DURATION (REDACTED_TIMESTAMP to REDACTED_TIMESTAMP) exceeds REDACTED_DURATION threshold REDACTED_ERROR", - "category" : "validation.device.schema", + "message" : "Multiple validation errors", + "detail" : "Timestamp jitter REDACTED_DURATION (REDACTED_TIMESTAMP to REDACTED_TIMESTAMP) exceeds REDACTED_DURATION threshold; Device has missing points: filter_alarm_pressure_status, filter_differential_pressure, filter_differential_pressure_sensor; Device has extra points: globulating_globar", + "category" : "validation.device.multiple", "timestamp" : "REDACTED_TIMESTAMP", "level" : 500 }, @@ -337,26 +336,15 @@ sites/udmi_site_model/out/devices/AHU-22/state.out "category" : "validation.device.schema", "timestamp" : "REDACTED_TIMESTAMP", "level" : 500 - } ] -} -:::::::::::::: -sites/udmi_site_model/out/devices/AHU-22/state_error.out -:::::::::::::: -{ - "timestamp" : "REDACTED_TIMESTAMP", - "version" : "1.4.2", - "sub_folder" : "error", - "sub_type" : "state", - "status" : { - "message" : "Error in message pipeline: While parsing json object, Unexpected character ('!' (code 33)): expected a valid value (JSON String, Number (or 'NaN'/'INF'/'+INF'), Array, Object or token 'null', 'true' or 'false') at [Source: (String)\"!&*@(!*&@!\"; line: 1, column: 2]", - "detail" : "state_error: Error in message pipeline: While parsing json object, Unexpected character ('!' (code 33)): expected a valid value (JSON String, Number (or 'NaN'/'INF'/'+INF'), Array, Object or token 'null', 'true' or 'false') at [Source: (String)\"!&*REDACTED_ERROR", + }, { + "message" : "Device has missing points: filter_alarm_pressure_status, filter_differential_pressure, filter_differential_pressure_sensor", + "detail" : "state_update: Device has missing points: filter_alarm_pressure_status, filter_differential_pressure, filter_differential_pressure_sensor", "category" : "validation.device.schema", "timestamp" : "REDACTED_TIMESTAMP", "level" : 500 - }, - "errors" : [ { - "message" : "Error in message pipeline: While parsing json object, Unexpected character ('!' (code 33)): expected a valid value (JSON String, Number (or 'NaN'/'INF'/'+INF'), Array, Object or token 'null', 'true' or 'false') at [Source: (String)\"!&*@(!*&@!\"; line: 1, column: 2]", - "detail" : "state_error: Error in message pipeline: While parsing json object, Unexpected character ('!' (code 33)): expected a valid value (JSON String, Number (or 'NaN'/'INF'/'+INF'), Array, Object or token 'null', 'true' or 'false') at [Source: (String)\"!&*REDACTED_ERROR", + }, { + "message" : "Device has extra points: globulating_globar", + "detail" : "state_update: Device has extra points: globulating_globar", "category" : "validation.device.schema", "timestamp" : "REDACTED_TIMESTAMP", "level" : 500 @@ -394,15 +382,15 @@ sites/udmi_site_model/out/devices/AHU-22/state_pointset.out "sub_folder" : "pointset", "sub_type" : "state", "status" : { - "message" : "Timestamp jitter REDACTED_DURATION (REDACTED_TIMESTAMP to REDACTED_TIMESTAMP) exceeds REDACTED_DURATION threshold", - "detail" : "state_pointset: Timestamp jitter REDACTED_DURATION (REDACTED_TIMESTAMP to REDACTED_TIMESTAMP) exceeds REDACTED_DURATION threshold REDACTED_ERROR", - "category" : "validation.device.schema", + "message" : "Multiple validation errors", + "detail" : "Timestamp jitter REDACTED_DURATION (REDACTED_TIMESTAMP to REDACTED_TIMESTAMP) exceeds REDACTED_DURATION threshold; Device has missing points: filter_alarm_pressure_status, filter_differential_pressure, filter_differential_pressure_sensor; Device has extra points: globulating_globar", + "category" : "validation.device.multiple", "timestamp" : "REDACTED_TIMESTAMP", "level" : 500 }, "pointset" : { - "missing" : [ ], - "extra" : [ ] + "missing" : [ "filter_alarm_pressure_status", "filter_differential_pressure", "filter_differential_pressure_sensor" ], + "extra" : [ "globulating_globar" ] }, "errors" : [ { "message" : "Timestamp jitter REDACTED_DURATION (REDACTED_TIMESTAMP to REDACTED_TIMESTAMP) exceeds REDACTED_DURATION threshold", @@ -410,6 +398,18 @@ sites/udmi_site_model/out/devices/AHU-22/state_pointset.out "category" : "validation.device.schema", "timestamp" : "REDACTED_TIMESTAMP", "level" : 500 + }, { + "message" : "Device has missing points: filter_alarm_pressure_status, filter_differential_pressure, filter_differential_pressure_sensor", + "detail" : "state_pointset: Device has missing points: filter_alarm_pressure_status, filter_differential_pressure, filter_differential_pressure_sensor", + "category" : "validation.device.schema", + "timestamp" : "REDACTED_TIMESTAMP", + "level" : 500 + }, { + "message" : "Device has extra points: globulating_globar", + "detail" : "state_pointset: Device has extra points: globulating_globar", + "category" : "validation.device.schema", + "timestamp" : "REDACTED_TIMESTAMP", + "level" : 500 } ] } :::::::::::::: diff --git a/pubber/src/main/java/daq/pubber/ListPublisher.java b/pubber/src/main/java/daq/pubber/ListPublisher.java index ea580d3729..59dabec9b5 100644 --- a/pubber/src/main/java/daq/pubber/ListPublisher.java +++ b/pubber/src/main/java/daq/pubber/ListPublisher.java @@ -1,14 +1,21 @@ package daq.pubber; +import static com.google.udmi.util.GeneralUtils.ifNotNullThen; + +import com.google.api.client.util.ArrayMap; import com.google.udmi.util.JsonUtil; +import java.util.AbstractMap.SimpleEntry; import java.util.ArrayList; +import java.util.HashMap; import java.util.List; -import java.util.concurrent.CountDownLatch; +import java.util.Map; +import java.util.Map.Entry; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; -import java.util.function.BiConsumer; import java.util.function.Consumer; +import udmi.schema.Config; import udmi.schema.PubberConfiguration; +import udmi.schema.SystemConfig; /** * Publishes message to an in-memory list. @@ -19,6 +26,7 @@ public class ListPublisher implements Publisher { private final PubberConfiguration configuration; private List messages = new ArrayList<>(); private String usePrefix; + private final Map, Class>> handlers = new HashMap<>(); public ListPublisher(PubberConfiguration configuration, Consumer onError) { this.configuration = configuration; @@ -48,19 +56,22 @@ public void setDeviceTopicPrefix(String deviceId, String topicPrefix) { @Override public void registerHandler(String deviceId, String topicSuffix, Consumer handler, Class messageType) { - + Consumer foo = (Consumer) handler; + Class clazz = (Class) messageType; + handlers.put(topicSuffix, new SimpleEntry<>(foo, clazz)); } @Override - public void connect(String deviceId) { - + public void connect(String deviceId, boolean clean) { + Consumer handler = handlers.get("config").getKey(); + handler.accept(new Config()); } @Override public void publish(String deviceId, String topicSuffix, Object message, Runnable callback) { String useTopic = usePrefix + "/" + topicSuffix; messages.add(getMessageString(deviceId, useTopic, message)); - publisherExecutor.submit(callback); + ifNotNullThen(callback, () -> publisherExecutor.submit(callback)); } @Override @@ -68,11 +79,6 @@ public boolean isActive() { return false; } - @Override - public void startupLatchWait(CountDownLatch configLatch, String message) { - - } - @Override public void close() { diff --git a/pubber/src/main/java/daq/pubber/MqttDevice.java b/pubber/src/main/java/daq/pubber/MqttDevice.java index d8431b6625..a1df68bb7c 100644 --- a/pubber/src/main/java/daq/pubber/MqttDevice.java +++ b/pubber/src/main/java/daq/pubber/MqttDevice.java @@ -1,6 +1,5 @@ package daq.pubber; -import java.util.concurrent.CountDownLatch; import java.util.function.Consumer; import udmi.schema.PubberConfiguration; @@ -43,11 +42,11 @@ public void registerHandler(String topicSuffix, Consumer handler, Class EVENT_SERIAL = new HashMap<>(); private static final String GCP_CLIENT_PREFIX = "projects/"; + public static final String EMPTY_STRING = ""; private final Semaphore connectionLock = new Semaphore(1); @@ -235,15 +236,17 @@ private String getSendTopic(String deviceId, String topicSuffix) { } private void closeMqttClient(String deviceId) { - MqttClient removed = cleanClients(deviceId); - if (removed != null) { - try { - if (removed.isConnected()) { - removed.disconnect(); + synchronized (mqttClients) { + MqttClient removed = cleanClients(deviceId); + if (removed != null) { + try { + if (removed.isConnected()) { + removed.disconnect(); + } + removed.close(); + } catch (Exception e) { + error("Error closing MQTT client: " + e, null, "stop", e); } - removed.close(); - } catch (Exception e) { - error("Error closing MQTT client: " + e, null, "stop", e); } } } @@ -282,12 +285,11 @@ private MqttClient newBoundClient(String deviceId) { try { String gatewayId = getGatewayId(deviceId); debug(format("Connecting device %s through gateway %s", deviceId, gatewayId)); - MqttClient mqttClient = getConnectedClient(gatewayId); + final MqttClient mqttClient = getConnectedClient(gatewayId); startupLatchWait(connectionLatch, "gateway startup exchange"); String topic = getMessageTopic(deviceId, MqttDevice.ATTACH_TOPIC); - String payload = ""; info("Publishing attach message " + topic); - mqttClient.publish(topic, payload.getBytes(StandardCharsets.UTF_8), QOS_AT_LEAST_ONCE, + mqttClient.publish(topic, EMPTY_STRING.getBytes(StandardCharsets.UTF_8), QOS_AT_LEAST_ONCE, SHOULD_RETAIN); subscribeToUpdates(mqttClient, deviceId); return mqttClient; @@ -296,8 +298,7 @@ private MqttClient newBoundClient(String deviceId) { } } - @Override - public void startupLatchWait(CountDownLatch gatewayLatch, String designator) { + private void startupLatchWait(CountDownLatch gatewayLatch, String designator) { try { int waitTimeSec = ofNullable(configuration.endpoint.config_sync_sec) .orElse(DEFAULT_CONFIG_WAIT_SEC); @@ -480,7 +481,8 @@ private String getDeviceId(String topic) { return topic.split("/")[2]; } - public void connect(String targetId) { + public void connect(String targetId, boolean clean) { + ifTrueThen(clean, () -> closeMqttClient(targetId)); getConnectedClient(targetId); } @@ -542,13 +544,6 @@ private void checkAuthentication(String targetId) { reauthTimes.remove(authId); synchronized (mqttClients) { MqttClient client = cleanClients(authId); - if (client == null) { - return; - } - Set removeSet = mqttClients.entrySet().stream() - .filter(entry -> entry.getValue() == client).map(Entry::getKey) - .collect(Collectors.toSet()); - removeSet.forEach(mqttClients::remove); try { client.disconnect(); client.close(); @@ -659,9 +654,7 @@ public void messageArrived(String topic, MqttMessage message) { String messageType = getMessageType(topic); String handlerKey = getHandlerKey(topic); String deviceId = getDeviceId(topic); - if (getGatewayId(deviceId) == null) { - connectionLatch.countDown(); - } + connectionLatch.countDown(); Consumer handler = handlers.get(handlerKey); Class type = handlersType.get(handlerKey); if (handler == null) { diff --git a/pubber/src/main/java/daq/pubber/Pubber.java b/pubber/src/main/java/daq/pubber/Pubber.java index 49f8069adb..a6ac05de14 100644 --- a/pubber/src/main/java/daq/pubber/Pubber.java +++ b/pubber/src/main/java/daq/pubber/Pubber.java @@ -20,9 +20,11 @@ import static com.google.udmi.util.GeneralUtils.toJsonFile; import static com.google.udmi.util.GeneralUtils.toJsonString; import static com.google.udmi.util.JsonUtil.safeSleep; +import static com.google.udmi.util.JsonUtil.stringifyTerse; import static daq.pubber.MqttDevice.CONFIG_TOPIC; import static daq.pubber.MqttDevice.ERRORS_TOPIC; import static daq.pubber.MqttDevice.STATE_TOPIC; +import static daq.pubber.MqttPublisher.DEFAULT_CONFIG_WAIT_SEC; import static java.lang.String.format; import static java.util.Objects.requireNonNull; import static java.util.Objects.requireNonNullElse; @@ -160,7 +162,7 @@ public class Pubber extends ManagerBase implements ManagerHost { final Config deviceConfig = new Config(); private final File outDir; private final ScheduledExecutorService executor = new CatchingScheduledThreadPoolExecutor(1); - private final CountDownLatch configLatch = new CountDownLatch(1); + private CountDownLatch configLatch; private final AtomicBoolean stateDirty = new AtomicBoolean(); private final ReentrantLock stateLock = new ReentrantLock(); private final String deviceId; @@ -178,6 +180,7 @@ public class Pubber extends ManagerBase implements ManagerHost { private int deviceUpdateCount = -1; private DeviceManager deviceManager; private Map proxyDevices = new HashMap<>(); + private boolean isConnected; /** * Start an instance from a configuration file. @@ -206,6 +209,9 @@ public Pubber(String iotProject, String sitePath, String deviceId, String serial super(null, makeExplicitConfiguration(iotProject, sitePath, deviceId, serialNo)); this.deviceId = deviceId; outDir = new File(PUBBER_OUT + "/" + serialNo); + if (!outDir.exists()) { + checkState(outDir.mkdirs(), "could not make out dir " + outDir.getAbsolutePath()); + } if (PUBSUB_SITE.equals(sitePath)) { pubSubClient = new PubSubClient(iotProject, deviceId); } @@ -443,6 +449,7 @@ protected void initializePersistentStore() { private void writePersistentStore() { checkState(persistentData != null, "persistent data not defined"); toJsonFile(getPersistentStore(), persistentData); + warn("Updating persistent store: " + stringifyTerse(persistentData)); deviceManager.setPersistentData(persistentData); } @@ -633,6 +640,10 @@ private void sendEmptyMissingBadEvents() { } private void deferredConfigActions() { + if (!isConnected) { + return; + } + deviceManager.maybeRestartSystem(); // Do redirect after restart system check, since this might take a long time. @@ -654,6 +665,8 @@ private void captureExceptions(String action, Runnable runnable) { } protected void startConnection(Function connectionDone) { + String nonce = String.valueOf(System.currentTimeMillis()); + warn(format("Starting connection %s with %d", nonce, retriesRemaining.get())); try { this.connectionDone = connectionDone; while (retriesRemaining.getAndDecrement() > 0) { @@ -664,16 +677,20 @@ protected void startConnection(Function connectionDone) { throw new RuntimeException("Failed connection attempt after retries"); } catch (Exception e) { throw new RuntimeException("While attempting to start connection", e); + } finally { + warn(format("Ending connection %s with %d", nonce, retriesRemaining.get())); } } private boolean attemptConnection() { try { + isConnected = false; if (deviceTarget == null) { throw new RuntimeException("Mqtt publisher not initialized"); } connect(); - deviceTarget.startupLatchWait(configLatch, "initial config sync"); + configLatchWait(); + isConnected = true; return true; } catch (Exception e) { error("While waiting for connection start", e); @@ -683,6 +700,20 @@ private boolean attemptConnection() { return false; } + private void configLatchWait() { + try { + int waitTimeSec = ofNullable(configuration.endpoint.config_sync_sec) + .orElse(DEFAULT_CONFIG_WAIT_SEC); + int useWaitTime = waitTimeSec == 0 ? DEFAULT_CONFIG_WAIT_SEC : waitTimeSec; + warn(format("Start waiting %ds for config latch for %s", useWaitTime, deviceId)); + if (useWaitTime > 0 && !configLatch.await(useWaitTime, TimeUnit.SECONDS)) { + throw new RuntimeException("Config latch timeout"); + } + } catch (Exception e) { + throw new RuntimeException("While waiting for config latch", e); + } + } + protected void initialize() { try { initializeDevice(); @@ -765,6 +796,8 @@ private void ensureKeyBytes() { private void connect() { try { + warn("Creating new config latch for " + deviceId); + configLatch = new CountDownLatch(1); deviceTarget.connect(); info("Connection complete."); workingEndpoint = toJsonString(configuration.endpoint); @@ -784,7 +817,6 @@ private void publisherException(Exception toReport) { } else if (toReport instanceof ConnectionClosedException) { error("Connection closed, attempting reconnect..."); while (retriesRemaining.getAndDecrement() > 0) { - error("TAP2"); if (attemptConnection()) { return; } @@ -861,8 +893,12 @@ private String exceptionDetail(Throwable e) { private void configHandler(Config config) { try { configPreprocess(deviceId, config); - debug(format("Config update%s", deviceManager.getTestingTag()), toJsonString(config)); + debug(format("Config update %s%s", deviceId, deviceManager.getTestingTag()), + toJsonString(config)); processConfigUpdate(config); + if (configLatch.getCount() > 0) { + warn("Received config for config latch " + deviceId); + } configLatch.countDown(); publisherConfigLog("apply", null); } catch (Exception e) { @@ -884,7 +920,7 @@ private void processConfigUpdate(Config config) { // Grab this to make state-after-config updates monolithic. stateLock.lock(); } catch (Exception e) { - throw new RuntimeException("While acquiting state lock", e); + throw new RuntimeException("While acquiring state lock", e); } try { @@ -1369,13 +1405,13 @@ private void publishStateMessage() { } stateDirty.set(false); deviceState.timestamp = getNow(); - info(format("update state %s last_config %s", isoConvert(deviceState.timestamp), + info(format("Update state %s last_config %s", isoConvert(deviceState.timestamp), isoConvert(deviceState.system.last_config))); publishStateMessage(deviceState); } private void publishStateMessage(Object stateToSend) { - if (configLatch.getCount() > 0) { + if (configLatch == null || configLatch.getCount() > 0) { warn("Dropping state update until config received..."); return; } @@ -1390,7 +1426,7 @@ private void publishStateMessage(Object stateToSend) { CountDownLatch latch = new CountDownLatch(1); try { - debug(format("State update%s", deviceManager.getTestingTag()), + debug(format("State update %s%s", deviceId, deviceManager.getTestingTag()), toJsonString(stateToSend)); } catch (Exception e) { throw new RuntimeException("While converting new device state", e); diff --git a/pubber/src/main/java/daq/pubber/Publisher.java b/pubber/src/main/java/daq/pubber/Publisher.java index 2289d11e20..c762a3febc 100644 --- a/pubber/src/main/java/daq/pubber/Publisher.java +++ b/pubber/src/main/java/daq/pubber/Publisher.java @@ -1,6 +1,5 @@ package daq.pubber; -import java.util.concurrent.CountDownLatch; import java.util.function.Consumer; interface Publisher { @@ -29,14 +28,13 @@ void registerHandler(String deviceId, String topicSuffix, Consumer handle * Connect the given device id. * * @param deviceId device to connect + * @param clean true to clean previous connections */ - void connect(String deviceId); + void connect(String deviceId, boolean clean); void publish(String deviceId, String topicSuffix, Object message, Runnable callback); boolean isActive(); - void startupLatchWait(CountDownLatch configLatch, String message); - void close(); } diff --git a/pubber/src/main/java/daq/pubber/SystemManager.java b/pubber/src/main/java/daq/pubber/SystemManager.java index dc2200b240..76100f1ece 100644 --- a/pubber/src/main/java/daq/pubber/SystemManager.java +++ b/pubber/src/main/java/daq/pubber/SystemManager.java @@ -91,6 +91,8 @@ public SystemManager(ManagerHost host, PubberConfiguration configuration) { super(host, configuration); this.host = host; + info("Device start time is " + getTimestamp(DEVICE_START_TIME)); + systemState = new SystemState(); systemState.operation = new StateSystemOperation(); @@ -220,7 +222,6 @@ void updateConfig(SystemConfig system, Date timestamp) { systemState.last_config = timestamp; updateInterval(ifNotNullGet(system, config -> config.metrics_rate_sec)); updateState(); - maybeRestartSystem(); } void publishLogMessage(Entry report) { diff --git a/udmis/udmis.iml b/udmis/udmis.iml index 7cda0e6f97..c41036a730 100644 --- a/udmis/udmis.iml +++ b/udmis/udmis.iml @@ -1,8 +1,8 @@ - - + + @@ -20,7 +20,6 @@ - diff --git a/validator/src/main/java/com/google/daq/mqtt/util/ObjectDiffEngine.java b/validator/src/main/java/com/google/daq/mqtt/util/ObjectDiffEngine.java index 13a9aac4f3..22270355e4 100644 --- a/validator/src/main/java/com/google/daq/mqtt/util/ObjectDiffEngine.java +++ b/validator/src/main/java/com/google/daq/mqtt/util/ObjectDiffEngine.java @@ -116,7 +116,9 @@ void accumulateDifference(String prefix, Map left, Map updates) { right.forEach((key, value) -> { String describedKey = describedKey(prefix, key); - String describedValue = describeValue(prefix, key, semanticValue(value)); + String raw = describeValue(prefix, key, semanticValue(value)); + int index = raw.indexOf('\n'); + String describedValue = index < 0 ? raw : (raw.substring(0, index) + "..."); if (left != null && left.containsKey(key)) { Object leftValue = left.get(key); if (SemanticValue.equals(value, leftValue)) {