Skip to content

Commit

Permalink
Reverting changes to make TDD test failure
Browse files Browse the repository at this point in the history
  • Loading branch information
grafnu committed Oct 1, 2024
1 parent 2bf9375 commit f8f060a
Show file tree
Hide file tree
Showing 2 changed files with 50 additions and 55 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.stream.Collectors;
import udmi.schema.Category;
Expand Down Expand Up @@ -171,9 +172,10 @@ public boolean hasErrors() {
* Validate a message against specific message-type expectations (outside of base schema).
*
* @param message Message to validate
* @param timestamp message timestamp string (rather than pull from typed object)
* @param attributes message attributes
*/
public void validateMessageType(Object message, Map<String, String> attributes) {
public void validateMessageType(Object message, Date timestamp, Map<String, String> attributes) {
if (reportingPointset == null) {
return;
}
Expand Down
101 changes: 47 additions & 54 deletions validator/src/main/java/com/google/daq/mqtt/validator/Validator.java
Original file line number Diff line number Diff line change
Expand Up @@ -601,7 +601,6 @@ private ReportingDevice validateMessageCore(Object message, Map<String, String>
try {
String schemaName = messageSchema(attributes);
if (!device.markMessageType(schemaName, getNow())) {
outputLogger.info("Ignoring %s/%s because ???", deviceId, schemaName);
return null;
}

Expand All @@ -613,16 +612,7 @@ private ReportingDevice validateMessageCore(Object message, Map<String, String>
}

writeDeviceOutDir(message, attributes, deviceId, schemaName);

if (message instanceof String) {
String detail = format("Raw string message for %s %s", deviceId, schemaName);
outputLogger.error(detail);
IllegalArgumentException exception = new IllegalArgumentException(detail);
device.addError(exception, attributes, Category.VALIDATION_DEVICE_RECEIVE);
return device;
}

validateDeviceMessage(device, mapCast(message), attributes);
validateDeviceMessage(device, message, attributes);

if (!device.hasErrors()) {
outputLogger.info("Validation clean %s/%s", deviceId, schemaName);
Expand All @@ -637,12 +627,21 @@ private ReportingDevice validateMessageCore(Object message, Map<String, String>
/**
* Validate a device message against the core schema.
*/
public void validateDeviceMessage(ReportingDevice device, Map<String, Object> message,
public void validateDeviceMessage(ReportingDevice device, Object baseMsg,
Map<String, String> attributes) {
String deviceId = attributes.get("deviceId");
device.clearMessageEntries();
String schemaName = messageSchema(attributes);

if (baseMsg instanceof String) {
String message = format("Raw string message for %s %s", deviceId, schemaName);
outputLogger.error(message);
IllegalArgumentException exception = new IllegalArgumentException(message);
device.addError(exception, attributes, Category.VALIDATION_DEVICE_RECEIVE);
return;
}

Map<String, Object> message = mapCast(baseMsg);
if (message.get(EXCEPTION_KEY) instanceof Exception exception) {
outputLogger.error("Pipeline exception " + deviceId + ": " + getExceptionMessage(exception));
device.addError(exception, attributes, Category.VALIDATION_DEVICE_RECEIVE);
Expand All @@ -660,7 +659,41 @@ public void validateDeviceMessage(ReportingDevice device, Map<String, Object> me

upgradeMessage(schemaName, message);

validateTimestamp(device, message, attributes);
String timestampRaw = (String) message.get("timestamp");
Instant timestamp = ifNotNullGet(timestampRaw, JsonUtil::getInstant);
String publishRaw = attributes.get(PUBLISH_TIME_KEY);
Instant publishTime = ifNotNullGet(publishRaw, JsonUtil::getInstant);
try {
// TODO: Validate message contests to make sure state sub-blocks don't also have timestamp.

String subTypeRaw = ofNullable(attributes.get(SUBTYPE_PROPERTY_KEY))
.orElse(UNKNOWN_TYPE_DEFAULT);
boolean lastSeenValid = LAST_SEEN_SUBTYPES.contains(SubType.fromValue(subTypeRaw));
if (lastSeenValid) {
if (publishTime != null) {
device.updateLastSeen(Date.from(publishTime));
}
if (timestamp == null) {
throw new RuntimeException("Missing message timestamp");
}
if (publishTime != null) {
if (!timestampRaw.endsWith(TIMESTAMP_ZULU_SUFFIX)
&& !timestampRaw.endsWith(TIMESTAMP_UTC_SUFFIX_1)
&& !timestampRaw.endsWith(TIMESTAMP_UTC_SUFFIX_2)) {
throw new RuntimeException("Invalid timestamp timezone " + timestampRaw);
}
long between = Duration.between(publishTime, timestamp).getSeconds();
if (between > TIMESTAMP_JITTER_SEC || between < -TIMESTAMP_JITTER_SEC) {
throw new RuntimeException(format(
"Timestamp jitter %ds (%s to %s) exceeds %ds threshold",
between, publishRaw, timestampRaw, TIMESTAMP_JITTER_SEC));
}
}
}
} catch (Exception e) {
outputLogger.error("Timestamp validation error: " + friendlyStackTrace(e));
device.addError(e, attributes, Category.VALIDATION_DEVICE_CONTENT);
}

try {
if (!schemaMap.containsKey(schemaName)) {
Expand Down Expand Up @@ -693,7 +726,7 @@ public void validateDeviceMessage(ReportingDevice device, Map<String, Object> me
try {
ifNotNullThen(CONTENT_VALIDATORS.get(schemaName), targetClass -> {
Object messageObject = OBJECT_MAPPER.convertValue(message, targetClass);
device.validateMessageType(messageObject, attributes);
device.validateMessageType(messageObject, JsonUtil.getDate(publishRaw), attributes);
});
} catch (Exception e) {
outputLogger.error("Error validating contents: " + friendlyStackTrace(e));
Expand All @@ -704,46 +737,6 @@ public void validateDeviceMessage(ReportingDevice device, Map<String, Object> me
}
}

private void validateTimestamp(ReportingDevice device, Map<String, Object> message,
Map<String, String> attributes) {
String timestampRaw = (String) message.get("timestamp");
Instant timestamp = ifNotNullGet(timestampRaw, JsonUtil::getInstant);
String publishRaw = attributes.get(PUBLISH_TIME_KEY);
Instant publishTime = ifNotNullGet(publishRaw, JsonUtil::getInstant);
try {
// TODO: Validate message contests to make sure state sub-blocks don't also have timestamp.

String subTypeRaw = ofNullable(attributes.get(SUBTYPE_PROPERTY_KEY))
.orElse(UNKNOWN_TYPE_DEFAULT);
boolean lastSeenValid = LAST_SEEN_SUBTYPES.contains(SubType.fromValue(subTypeRaw));
if (lastSeenValid) {
if (publishTime != null) {
device.updateLastSeen(Date.from(publishTime));
}
if (timestamp == null) {
throw new RuntimeException("Missing message timestamp");
}
if (timestampRaw != null
&& !timestampRaw.endsWith(TIMESTAMP_ZULU_SUFFIX)
&& !timestampRaw.endsWith(TIMESTAMP_UTC_SUFFIX_1)
&& !timestampRaw.endsWith(TIMESTAMP_UTC_SUFFIX_2)) {
throw new RuntimeException("Invalid timestamp timezone " + timestampRaw);
}
if (publishTime != null) {
long between = Duration.between(publishTime, timestamp).getSeconds();
if (between > TIMESTAMP_JITTER_SEC || between < -TIMESTAMP_JITTER_SEC) {
throw new RuntimeException(format(
"Timestamp jitter %ds (%s to %s) exceeds %ds threshold",
between, publishRaw, timestampRaw, TIMESTAMP_JITTER_SEC));
}
}
}
} catch (Exception e) {
outputLogger.error("Timestamp validation error: " + friendlyStackTrace(e));
device.addError(e, attributes, Category.VALIDATION_DEVICE_CONTENT);
}
}

private void sendValidationResult(Map<String, String> origAttributes,
ReportingDevice reportingDevice, Date now) {
try {
Expand Down

0 comments on commit f8f060a

Please sign in to comment.