Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add MessageUpgrader to validator #286

Merged
merged 62 commits into from
Apr 5, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
62 commits
Select commit Hold shift + click to select a range
619bfee
Make gateway results more flexible
grafnu Mar 21, 2022
21f32d0
Gencode
grafnu Mar 21, 2022
9cddf4d
Single test running capability
grafnu Mar 23, 2022
1cc8d1c
Basic cleanup of test runner
grafnu Mar 23, 2022
a39b0e7
Cleanup
grafnu Mar 23, 2022
81ca9a7
Limit test class
grafnu Mar 24, 2022
08c5f2b
Test method target checking
grafnu Mar 24, 2022
7a55b50
Disable test method
grafnu Mar 24, 2022
47c2785
Linty
grafnu Mar 24, 2022
3f72cf8
linty
grafnu Mar 24, 2022
2693c4a
Single test on command line
grafnu Mar 24, 2022
f93b174
Diagnostics
grafnu Mar 24, 2022
5e2610a
Use . to mean all tests
grafnu Mar 24, 2022
6d7d029
Merge branch 'master' into gatewayerror
grafnu Mar 24, 2022
0f5d45a
Single test fix
grafnu Mar 25, 2022
9ee92ed
Cleanup gateway.out
grafnu Mar 25, 2022
f20fbf4
Update out/ directory to be under the site model
grafnu Mar 25, 2022
78bb691
Cleanup of useFirestore
grafnu Mar 25, 2022
b95f4e4
Fix config timestamp/version generation
grafnu Mar 25, 2022
34beb8f
Add log level filtering
grafnu Mar 26, 2022
6b5fc70
Adding logLevel flag
grafnu Mar 29, 2022
a6447bc
Chaing info level for a few things
grafnu Mar 29, 2022
6b5802e
Notification cleanup
grafnu Mar 29, 2022
1960244
Debug test output
grafnu Mar 29, 2022
fda67ac
Fix test out location
grafnu Mar 29, 2022
665c39d
Fixing validator.out
grafnu Mar 29, 2022
46f41a8
Upgrade validator messages
grafnu Mar 30, 2022
c6f9f9a
Trying to fix validations
grafnu Mar 30, 2022
beff2f9
Merge branch 'master' into gatewayerror
grafnu Mar 30, 2022
dd60b5d
Fix for non-firebase world
grafnu Mar 30, 2022
52a5662
Make boolean
grafnu Mar 30, 2022
d76848e
Fix expected out dump
grafnu Mar 30, 2022
c08d402
Fix expected out
grafnu Mar 30, 2022
cf6553f
Logging cleanup
grafnu Mar 30, 2022
f40bd88
REstore validator.out
grafnu Mar 30, 2022
2011ce3
Sort validator out files
grafnu Mar 30, 2022
0345e2b
Fix sort order
grafnu Mar 30, 2022
1a85975
Fix expected out
grafnu Mar 30, 2022
72fb2b2
Merge branch 'gatewayerror' into validator
grafnu Mar 30, 2022
d1c8978
Merge branch 'master' into validator
grafnu Apr 1, 2022
a36e72b
Fix pubber disconnect error handling
grafnu Apr 1, 2022
298c225
FIx output clean
grafnu Apr 1, 2022
ae12797
Allow processing raw streams
grafnu Apr 1, 2022
d78cda3
Minor cleanup
grafnu Apr 1, 2022
59abda9
Fix expected out
grafnu Apr 1, 2022
3414e8b
Add additional enum value
grafnu Apr 1, 2022
efcb091
Cleanup
grafnu Apr 1, 2022
8cfb01c
Fix serial number test
grafnu Apr 1, 2022
973dc80
Adding in config update delay
grafnu Apr 2, 2022
ed7bf47
Add config update send delay
grafnu Apr 2, 2022
939489c
Change order of validator test
grafnu Apr 2, 2022
7aae9dd
Fix out order
grafnu Apr 2, 2022
699cdef
Fix out order
grafnu Apr 2, 2022
e49dfa1
Fix states handling
grafnu Apr 4, 2022
a353068
Clean up update handling
grafnu Apr 4, 2022
1c688d3
Fixup JS code
grafnu Apr 4, 2022
7bbef70
Fix update handling
grafnu Apr 4, 2022
31a127d
Fix typo
grafnu Apr 4, 2022
0614dd1
Fixing state query
grafnu Apr 4, 2022
e397a8a
Fix some update details
grafnu Apr 5, 2022
44c5edc
Fix sequencer
grafnu Apr 5, 2022
407532c
Fix validator
grafnu Apr 5, 2022
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions .gencode_hash.txt
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
477bfaa6fe5b2e55a2afaa3c3897e19a090c490f8a14344a84240420289e8ef6 gencode/docs/config.html
8221d216d08bc6b67f6f081132aa86f78934149bc5be23fe9132eb187d176a86 gencode/docs/envelope.html
6371d915647bc852edb7bc446637c2d43d3c3db49661f62d7c27d3d71c6671dd gencode/docs/envelope.html
559379abd267e9719a2bee98e56c21301853b6f69bc37fddc501acc57ce7cd63 gencode/docs/event_discovery.html
987503860562a3971314a98d75890b6c7615fee84bff6bede7010231f469c035 gencode/docs/event_pointset.html
e3fb2b1a96f6fcb06f5af6cff32829abc825065ed52de81c3d379c8c8070fe09 gencode/docs/event_system.html
Expand All @@ -23,7 +23,7 @@ b6ff9b8739a9c3bb6972f73db6fc54f451189c13b273e58bc11cb3d82c74ad40 gencode/java/u
eb3df3042d3c2008e51c35f35074741ba94a5a7fd590b5f1e59bd30ec19b4c2f gencode/java/udmi/schema/DiscoveryEvent.java
b9b1c6dc52c28630021c76d51305cb2fe634c557f3cf9b8e5c8c8abf456e6216 gencode/java/udmi/schema/DiscoveryState.java
090bbaf1508aa6ca8380af936af990673f300eb5a940c9e8ab94deb64efa2b7b gencode/java/udmi/schema/Entry.java
e22684e98a6dd9f213093c4e8293f353cd5faafb66264fc34a06167c6c3833a3 gencode/java/udmi/schema/Envelope.java
d5cd62caeb10e69d1c7099019a45f995c9b483061ef0832aac711a790b2023c8 gencode/java/udmi/schema/Envelope.java
e9f5c77be81486b6b8c6d88f70f2d50583d8c3fafa2ac09ead80f44b8d5e751e gencode/java/udmi/schema/Event.java
70ac42b1f93211420e8b40add27a4388dffcaaac60ead45852412aa815520605 gencode/java/udmi/schema/Families.java
aa0885ca43ab38c7597eacc38b7c512940a1a9fa061abd47d02c28e66b6fd93e gencode/java/udmi/schema/FamilyDiscoveryConfig.java
Expand Down
18 changes: 9 additions & 9 deletions .github/workflows/testing.yml
Original file line number Diff line number Diff line change
Expand Up @@ -60,23 +60,23 @@ jobs:
run: |
bin/setup_base
bin/clone_model
- name: telemetry validator
- name: sequence tests
env:
GCP_TARGET_PROJECT: ${{ secrets.GCP_TARGET_PROJECT }}
if: "${{ env.GCP_TARGET_PROJECT != '' }}"
run: |
bin/test_validator $GCP_TARGET_PROJECT
cat udmi_site_model/out/validation_report.json && echo
cat udmi_site_model/out/devices/AHU-1/state.json && echo
diff -u /tmp/validator.out etc/validator.out && echo No validator diff detected.
- name: sequence tests
bin/test_sequencer $GCP_TARGET_PROJECT
more /tmp/sequencer.out
diff -u /tmp/sequencer.out etc/sequencer.out && echo No output diff detected.
- name: telemetry validator
env:
GCP_TARGET_PROJECT: ${{ secrets.GCP_TARGET_PROJECT }}
if: "${{ env.GCP_TARGET_PROJECT != '' }}"
run: |
bin/test_sequencer $GCP_TARGET_PROJECT
cat /tmp/sequencer.out
diff -u /tmp/sequencer.out etc/sequencer.out && echo No output diff detected.
# Run after sequencer to device config starts in a known state
bin/test_validator $GCP_TARGET_PROJECT
more /tmp/validator.out
diff -u /tmp/validator.out etc/validator.out && echo No validator diff detected.
- name: code checks
if: ${{ always() }}
run: |
Expand Down
1 change: 0 additions & 1 deletion bin/loop_sequences
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,6 @@ for test_name in $test_names; do
test_name=
fi
for test_class in ConfigValidator WritebackValidator; do
rm -f out/*.json
target=$test_class$test_prefix$test_name
CLASS=com.google.daq.mqtt.validator.validations.$target
echo $JAVA_CMD $CLASS
Expand Down
2 changes: 1 addition & 1 deletion dashboard/deploy_dashboard_gcloud
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ echo "const udmi_deploy_version = '$version';" > public/deploy_version.js
PUBSUB_FUNCTIONS="udmi_target udmi_state udmi_config udmi_reflect"
for func in $PUBSUB_FUNCTIONS; do
echo Deploying pubsub-trigger function $func...
gcloud functions deploy $func --trigger-topic=$func --runtime=$RUNTIME --project=$PROJECT --source=$SOURCE &
gcloud functions deploy $func --set-env-vars GCP_PROJECT=$PROJECT --trigger-topic=$func --runtime=$RUNTIME --project=$PROJECT --source=$SOURCE &
sleep 10
done

Expand Down
101 changes: 62 additions & 39 deletions dashboard/functions/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
const PROJECT_ID = process.env.GCP_PROJECT || process.env.GCLOUD_PROJECT;
const useFirestore = !!process.env.FIREBASE_CONFIG;
if (!process.env.GCLOUD_PROJECT) {
console.log("Setting GCLOUD_PROJECT to " + PROJECT_ID);
console.log('Setting GCLOUD_PROJECT to ' + PROJECT_ID);
process.env.GCLOUD_PROJECT = PROJECT_ID;
}

Expand All @@ -20,6 +20,8 @@ const UDMI_VERSION = '1.3.14';
const EVENT_TYPE = 'event';
const CONFIG_TYPE = 'config';
const STATE_TYPE = 'state';
const UPDATE_FOLDER = 'update';
const QUERY_FOLDER = 'query';

const ALL_REGIONS = ['us-central1', 'europe-west1', 'asia-east1'];
let registry_regions = null;
Expand Down Expand Up @@ -48,7 +50,6 @@ function recordMessage(attributes, message) {
const subFolder = attributes.subFolder || 'unknown';
const timestamp = (message && message.timestamp) || currentTimestamp();
const promises = [];
const collectionType = subType + 's';

if (message) {
message.timestamp = timestamp;
Expand All @@ -66,7 +67,7 @@ function recordMessage(attributes, message) {
promises.push(dev_doc.set({
'updated': timestamp
}, { merge: true }));
const config_doc = dev_doc.collection(collectionType).doc(subFolder);
const config_doc = dev_doc.collection(subType).doc(subFolder);
if (message) {
promises.push(config_doc.set(message));
} else {
Expand All @@ -81,7 +82,10 @@ function recordMessage(attributes, message) {
}

function sendCommand(registryId, deviceId, subFolder, message) {
const messageStr = JSON.stringify(message);
return sendCommandStr(registryId, deviceId, subFolder, JSON.stringify(message));
}

function sendCommandStr(registryId, deviceId, subFolder, messageStr) {
return registry_promise.then(() => {
return sendCommandSafe(registryId, deviceId, subFolder, messageStr);
});
Expand Down Expand Up @@ -159,24 +163,25 @@ exports.udmi_reflect = functions.pubsub.topic('udmi_reflect').onPublish((event)

attributes.deviceRegistryId = attributes.deviceId;
attributes.deviceId = parts[1];
const subType = parts[2];
attributes.subFolder = parts[3];
attributes.subFolder = parts[2];
attributes.subType = parts[3];
console.log('Reflect', attributes.deviceId, attributes.subType, attributes.subFolder);

return registry_promise.then(() => {
attributes.cloudRegion = registry_regions[attributes.deviceRegistryId];
if (subType == 'query') {
if (attributes.subFolder == QUERY_FOLDER) {
return udmi_query_event(attributes, msgObject);
}
target = 'udmi_' + subType;
target = 'udmi_' + attributes.subType;
return publishPubsubMessage(target, attributes, msgObject);
});
});

function udmi_query_event(attributes, msgObject) {
if (attributes.subFolder == 'states') {
if (attributes.subType == STATE_TYPE) {
return udmi_query_states(attributes);
}
throw 'Unknown query type ' + attributes.subFolder;
throw 'Unknown query type ' + attributes.subType;
}

function udmi_query_states(attributes) {
Expand All @@ -202,7 +207,7 @@ function udmi_query_states(attributes) {
const stateBinaryData = deviceData[0].state.binaryData;
const stateString = stateBinaryData.toString();
const msgObject = JSON.parse(stateString);
return process_states_update(attributes, msgObject);
return process_state_update(attributes, msgObject);
});
}

Expand All @@ -212,18 +217,21 @@ exports.udmi_state = functions.pubsub.topic('udmi_state').onPublish((event) => {
const msgString = Buffer.from(base64, 'base64').toString();
const msgObject = JSON.parse(msgString);

return process_states_update(attributes, msgObject);
return process_state_update(attributes, msgObject);
});

function process_states_update(attributes, msgObject) {
function process_state_update(attributes, msgObject) {
let promises = [];
const deviceId = attributes.deviceId;
const registryId = attributes.deviceRegistryId;

const commandFolder = `devices/${deviceId}/update/states`;
const commandFolder = `devices/${deviceId}/${STATE_TYPE}/${UPDATE_FOLDER}`;
promises.push(sendCommand(REFLECT_REGISTRY, registryId, commandFolder, msgObject));

attributes.subFolder = UPDATE_FOLDER;
attributes.subType = STATE_TYPE;
promises.push(publishPubsubMessage('udmi_target', attributes, msgObject));

for (var block in msgObject) {
let subMsg = msgObject[block];
if (typeof subMsg === 'object') {
Expand Down Expand Up @@ -263,37 +271,48 @@ exports.udmi_config = functions.pubsub.topic('udmi_config').onPublish((event) =>
if (useFirestore) {
console.info('Deferring to firestore trigger for IoT Core modification.');
} else {
promises.push(modify_device_config(registryId, deviceId, subFolder, msgObject));
promises.push(modify_device_config(registryId, deviceId, subFolder, currentTimestamp(), msgObject));
}

return Promise.all(promises);
});

async function modify_device_config(registryId, deviceId, subFolder, subContents) {
const [oldConfig, version] = await get_device_config(registryId, deviceId);
let newConfig = {};
function parse_old_config(oldConfig, resetConfig) {
if (!oldConfig || resetConfig) {
console.warn('Resetting config bock, explicit=' + resetConfig);
return {};
}

try {
const resetConfig = subFolder === "system" && subContents.extra_field === "reset_config";
if (!resetConfig && oldConfig) {
newConfig = JSON.parse(oldConfig);
} else {
console.log("Config reset explicit=" + resetConfig);
resetConfig && delete subContents.extra_field;
}
} catch (e) {
return JSON.parse(oldConfig);
} catch(e) {
console.warn('Previous config parse error, ignoring update');
return null;
}
}

async function modify_device_config(registryId, deviceId, subFolder, startTime, subContents) {
const [oldConfig, version] = await get_device_config(registryId, deviceId);

const resetConfig = subFolder == 'system' && subContents && subContents.extra_field == 'reset_config';
const newConfig = parse_old_config(oldConfig, resetConfig);
if (newConfig === null) {
return;
}

newConfig.version = UDMI_VERSION;
newConfig.timestamp = currentTimestamp();

console.log('Config modify version', version, subFolder);
console.log('Config modify version', subFolder, version, startTime);
if (subContents) {
delete subContents.version;
delete subContents.timestamp;
newConfig[subFolder] = subContents;
} else {
if (!newConfig[subFolder]) {
console.log('Config target already null', subFolder, version, startTime);
return;
}
delete newConfig[subFolder];
}
const attributes = {
Expand All @@ -303,13 +322,12 @@ async function modify_device_config(registryId, deviceId, subFolder, subContents
deviceRegistryId: registryId
};
return update_device_config(newConfig, attributes, version)
.catch(e => {
console.log('Config update rejected, retry', subFolder);
return modify_device_config(registryId, deviceId, subFolder, subContents);
})
.then(() => {
console.log('Config accepted version', version, subFolder);
});
console.log('Config accepted version', subFolder, version, startTime);
}).catch(e => {
console.log('Config update rejected', subFolder, version, startTime);
return modify_device_config(registryId, deviceId, subFolder, startTime, subContents);
})
}

async function get_device_config(registryId, deviceId) {
Expand Down Expand Up @@ -365,19 +383,23 @@ function update_device_config(message, attributes, preVersion) {
versionToUpdate: version,
binaryData: binaryData
};
const commandFolder = `devices/${deviceId}/update/configs`;
const commandFolder = `devices/${deviceId}/${CONFIG_TYPE}/${UPDATE_FOLDER}`;

return iotClient
.modifyCloudToDeviceConfig(request)
.then(() => sendCommand(REFLECT_REGISTRY, registryId, commandFolder, message));
.then(() => sendCommandStr(REFLECT_REGISTRY, registryId, commandFolder, msgString));
}

function consolidate_config(registryId, deviceId) {
function consolidate_config(registryId, deviceId, subFolder) {
const cloudRegion = registry_regions[registryId];
const reg_doc = firestore.collection('registries').doc(registryId);
const dev_doc = reg_doc.collection('devices').doc(deviceId);
const configs = dev_doc.collection('configs');

if (subFolder == UPDATE_FOLDER) {
return;
}

console.log('consolidating config for', registryId, deviceId);

const new_config = {
Expand Down Expand Up @@ -416,11 +438,12 @@ function consolidate_config(registryId, deviceId) {
}

exports.udmi_update = functions.firestore
.document('registries/{registryId}/devices/{deviceId}/configs/{subFolder}')
.document('registries/{registryId}/devices/{deviceId}/config/{subFolder}')
.onWrite((change, context) => {
const registryId = context.params.registryId;
const deviceId = constext.params.deviceId;
return registry_promise.then(consolidate_config(registryId, deviceId));
const deviceId = context.params.deviceId;
const subFolder = context.params.subFolder;
return registry_promise.then(consolidate_config(registryId, deviceId, subFolder));
});

function publishPubsubMessage(topicName, attributes, data) {
Expand Down
4 changes: 2 additions & 2 deletions etc/sequencer.out
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
RESULT pass broken_config Sequence complete
RESULT pass extra_config Sequence complete
RESULT pass provided_serial_no Sequence complete
RESULT pass provided_serial_no Sequence complete
RESULT pass system_last_update Sequence complete
RESULT pass valid_serial_no Sequence complete
RESULT pass valid_serial_no Sequence complete
RESULT skip writeback_states Missing 'invalid' target specification
17 changes: 0 additions & 17 deletions etc/validator.out
Original file line number Diff line number Diff line change
@@ -1,24 +1,7 @@
::::::::::::::
udmi_site_model/out/devices/AHU-1/state.out
::::::::::::::
While converting to json node: 2 schema violations found
2 schema violations found
instance value ("states") not found in enum (possible values: ["update","discovery","system","gateway","localnet","metadata","pointset","blobset"])
instance value ("update") not found in enum (possible values: ["event","command","state","config"])
While converting to json node: 2 schema violations found
2 schema violations found
object has missing required properties (["last_config"])
object has missing required properties (["version"])
::::::::::::::
udmi_site_model/out/devices/AHU-1/state_pointset.out
::::::::::::::
While converting to json node: 1 schema violations found
1 schema violations found
object instance has properties which are not allowed by the schema: ["version"]
::::::::::::::
udmi_site_model/out/devices/AHU-1/state_system.out
::::::::::::::
While converting to json node: 2 schema violations found
2 schema violations found
object has missing required properties (["last_config"])
object instance has properties which are not allowed by the schema: ["version"]
4 changes: 2 additions & 2 deletions gencode/docs/envelope.html

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions gencode/java/udmi/schema/Envelope.java

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

10 changes: 5 additions & 5 deletions pubber/src/main/java/daq/pubber/MqttPublisher.java
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,10 @@ private void publishCore(String deviceId, String topic, Object data) {
warn(String.format("Publish failed for %s: %s", deviceId, e));
if (configuration.gatewayId == null) {
closeMqttClient(deviceId);
if (mqttClients.isEmpty()) {
warn("Last client closed, shutting down publisher");
publisherExecutor.shutdown();
}
} else {
close();
}
Expand Down Expand Up @@ -137,14 +141,10 @@ void close() {
}
mqttClients.keySet().forEach(this::closeMqttClient);
} catch (Exception e) {
throw new RuntimeException("While closing publisher");
throw new RuntimeException("While closing publisher", e);
}
}

long clientCount() {
return mqttClients.size();
}

private void validateCloudIotOptions() {
try {
checkNotNull(configuration.bridgeHostname, "bridgeHostname");
Expand Down
Loading