Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[#1296] Swinging Door Trending (SDT) Filter Processor #1306

Merged
merged 6 commits into from
Feb 20, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import org.apache.streampipes.processors.filters.jvm.processor.numericaltextfilter.NumericalTextFilterProcessor;
import org.apache.streampipes.processors.filters.jvm.processor.projection.ProjectionProcessor;
import org.apache.streampipes.processors.filters.jvm.processor.schema.MergeBySchemaProcessor;
import org.apache.streampipes.processors.filters.jvm.processor.sdt.SwingingDoorTrendingFilterProcessor;
import org.apache.streampipes.processors.filters.jvm.processor.textfilter.TextFilterProcessor;
import org.apache.streampipes.processors.filters.jvm.processor.threshold.ThresholdDetectionProcessor;
import org.apache.streampipes.processors.filters.jvm.processor.throughputmon.ThroughputMonitorProcessor;
Expand Down Expand Up @@ -67,7 +68,8 @@ public SpServiceDefinition provideServiceDefinition() {
new ComposeProcessor(),
new NumericalTextFilterProcessor(),
new RateLimitProcessor(),
new MovingAverageProcessor())
new MovingAverageProcessor(),
new SwingingDoorTrendingFilterProcessor())
.registerMessagingFormats(
new JsonDataFormatFactory(),
new CborDataFormatFactory(),
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,182 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/

package org.apache.streampipes.processors.filters.jvm.processor.sdt;

import org.apache.streampipes.model.runtime.Event;
import org.apache.streampipes.wrapper.routing.SpOutputCollector;

public class SwingingDoorTrendingFilter {

/**
* the maximum absolute difference the user set if the data's value is within compressionDeviation, it
* will be compressed and discarded after compression, it will only store out of range (time, data) to form the trend
*/
private final double compressionDeviation;

/**
* the minimum time distance between two stored data points if current point time to the last
* stored point time distance <= compressionMinTimeInterval, current point will NOT be stored regardless of
* compression deviation
*/
private final long compressionMinTimeInterval;

/**
* the maximum time distance between two stored data points if current point time to the last
* stored point time distance >= compressionMaxTimeInterval, current point will be stored regardless of
* compression deviation
*/
private final long compressionMaxTimeInterval;

/**
* isFirstValue is true when the encoder takes the first point or reset() when cur point's
* distance to the last stored point's distance exceeds compressionMaxTimeInterval
*/
private boolean isFirstValue = true;

/**
* the maximum curUpperSlope between the lastStoredPoint to the current point upperDoor can only
* open up
*/
private double upperDoor = Integer.MIN_VALUE;

/**
* the minimum curLowerSlope between the lastStoredPoint to the current point lowerDoor can only
* open downward
*/
private double lowerDoor = Integer.MAX_VALUE;

/**
* the last read time and value if upperDoor >= lowerDoor meaning out of compressionDeviation range, will
* store lastReadPair
*/
private long lastReadTimestamp;
private double lastReadDouble;
private Event lastReadEvent;

/**
* the last stored time and value we compare current point against lastStoredPair
*/
private long lastStoredTimestamp;
private double lastStoredDouble;
private Event lastStoredEvent;

public SwingingDoorTrendingFilter(double compressionDeviation, long compressionMinTimeInterval,
long compressionMaxTimeInterval) {
this.compressionDeviation = compressionDeviation;
this.compressionMinTimeInterval = compressionMinTimeInterval;
this.compressionMaxTimeInterval = compressionMaxTimeInterval;
}

/**
* input a newly arrived event and output whether a new characteristic event is filtered
*
* @param time the timestamp extracted from the newly arrived event
* @param value the value extracted from the newly arrived event
* @param event the newly arrived event
* @return true if a new characteristic event is filtered
*/
public boolean filter(long time, double value, Event event) {
// store the first time and value pair
if (isFirstValue) {
isFirstValue = false;

lastReadTimestamp = time;
lastReadDouble = value;
lastReadEvent = event;

lastStoredTimestamp = time;
lastStoredDouble = value;
lastStoredEvent = event;

return true;
}

// if current point to the last stored point's time distance is within compressionMinTimeInterval,
// will not check two doors nor store any point within the compressionMinTimeInterval time range
if (time - lastStoredTimestamp <= compressionMinTimeInterval) {
return false;
}

// if current point to the last stored point's time distance is larger than compressionMaxTimeInterval,
// will reset two doors, and store current point;
if (time - lastStoredTimestamp >= compressionMaxTimeInterval) {
reset(time, value, event);
return true;
}

final double currentUpperSlope = (value - lastStoredDouble - compressionDeviation) / (time - lastStoredTimestamp);
if (currentUpperSlope > upperDoor) {
upperDoor = currentUpperSlope;
}

final double currentLowerSlope = (value - lastStoredDouble + compressionDeviation) / (time - lastStoredTimestamp);
if (currentLowerSlope < lowerDoor) {
lowerDoor = currentLowerSlope;
}

// current point to the lastStoredPair's value exceeds compDev, will store lastReadPair and
// update two doors
if (upperDoor >= lowerDoor) {
lastStoredTimestamp = lastReadTimestamp;
lastStoredDouble = lastReadDouble;
lastStoredEvent = lastReadEvent;

upperDoor = (value - lastStoredDouble - compressionDeviation) / (time - lastStoredTimestamp);
lowerDoor = (value - lastStoredDouble + compressionDeviation) / (time - lastStoredTimestamp);

lastReadDouble = value;
lastReadTimestamp = time;
lastReadEvent = event;

return true;
}

lastReadDouble = value;
lastReadTimestamp = time;
lastReadEvent = event;

return false;
}

/**
* output the recently filtered characteristic event to the collector
*
* @param collector the event collector
*/
public void forward(SpOutputCollector collector) {
collector.collect(lastStoredEvent);
}

/**
* if current point to the last stored point's time distance >= compressionMaxTimeInterval, will store current
* point and reset upperDoor and lowerDoor
*
* @param time current time
* @param value current value
* @param event current event
*/
private void reset(long time, double value, Event event) {
lastStoredTimestamp = time;
lastStoredDouble = value;
lastStoredEvent = event;

upperDoor = Integer.MIN_VALUE;
lowerDoor = Integer.MAX_VALUE;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,133 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/

package org.apache.streampipes.processors.filters.jvm.processor.sdt;

import org.apache.streampipes.commons.exceptions.SpRuntimeException;
import org.apache.streampipes.model.DataProcessorType;
import org.apache.streampipes.model.graph.DataProcessorDescription;
import org.apache.streampipes.model.runtime.Event;
import org.apache.streampipes.model.schema.PropertyScope;
import org.apache.streampipes.sdk.builder.ProcessingElementBuilder;
import org.apache.streampipes.sdk.builder.StreamRequirementsBuilder;
import org.apache.streampipes.sdk.helpers.EpRequirements;
import org.apache.streampipes.sdk.helpers.Labels;
import org.apache.streampipes.sdk.helpers.Locales;
import org.apache.streampipes.sdk.helpers.OutputStrategies;
import org.apache.streampipes.sdk.utils.Assets;
import org.apache.streampipes.wrapper.context.EventProcessorRuntimeContext;
import org.apache.streampipes.wrapper.routing.SpOutputCollector;
import org.apache.streampipes.wrapper.standalone.ProcessorParams;
import org.apache.streampipes.wrapper.standalone.StreamPipesDataProcessor;

public class SwingingDoorTrendingFilterProcessor extends StreamPipesDataProcessor {

private static final String SDT_TIMESTAMP_FIELD_KEY = "sdt-timestamp-field";
private static final String SDT_VALUE_FIELD_KEY = "sdt-value-field";
private static final String SDT_COMPRESSION_DEVIATION_KEY = "sdt-compression-deviation";
private static final String SDT_COMPRESSION_MIN_INTERVAL_KEY = "sdt-compression-min-interval";
private static final String SDT_COMPRESSION_MAX_INTERVAL_KEY = "sdt-compression-max-interval";

private String sdtTimestampField;
private String sdtValueField;

private float sdtCompressionDeviation = Float.MAX_VALUE;
private long sdtCompressionMinTimeInterval = 0L;
private long sdtCompressionMaxTimeInterval = Long.MAX_VALUE;

private SwingingDoorTrendingFilter sdtFilter;

@Override
public DataProcessorDescription declareModel() {
return ProcessingElementBuilder.create("org.apache.streampipes.processors.filters.jvm.sdt")
.category(DataProcessorType.FILTER)
.withAssets(Assets.DOCUMENTATION, Assets.ICON)
.withLocales(Locales.EN)
.requiredStream(StreamRequirementsBuilder.create()
.requiredPropertyWithUnaryMapping(EpRequirements.timestampReq(),
Labels.withId(SDT_TIMESTAMP_FIELD_KEY), PropertyScope.NONE)
.requiredPropertyWithUnaryMapping(EpRequirements.numberReq(),
Labels.withId(SDT_VALUE_FIELD_KEY), PropertyScope.NONE)
.build())
.requiredFloatParameter(Labels.withId(SDT_COMPRESSION_DEVIATION_KEY))
.requiredLongParameter(Labels.withId(SDT_COMPRESSION_MIN_INTERVAL_KEY), 0L)
.requiredLongParameter(Labels.withId(SDT_COMPRESSION_MAX_INTERVAL_KEY), Long.MAX_VALUE)
.outputStrategy(OutputStrategies.keep())
.build();
}

@Override
public void onInvocation(ProcessorParams parameters, SpOutputCollector spOutputCollector,
EventProcessorRuntimeContext runtimeContext) throws SpRuntimeException {
// extract field names on stream
sdtTimestampField = parameters.extractor().mappingPropertyValue(SDT_TIMESTAMP_FIELD_KEY);
sdtValueField = parameters.extractor().mappingPropertyValue(SDT_VALUE_FIELD_KEY);

// extract & check sdt compression params
sdtCompressionDeviation = parameters.extractor().singleValueParameter(SDT_COMPRESSION_DEVIATION_KEY, Float.class);
sdtCompressionMinTimeInterval =
parameters.extractor().singleValueParameter(SDT_COMPRESSION_MIN_INTERVAL_KEY, Long.class);
sdtCompressionMaxTimeInterval =
parameters.extractor().singleValueParameter(SDT_COMPRESSION_MAX_INTERVAL_KEY, Long.class);
checkSdtCompressionParams();

sdtFilter = new SwingingDoorTrendingFilter(sdtCompressionDeviation, sdtCompressionMinTimeInterval,
sdtCompressionMaxTimeInterval);
}

/**
* @throws SpRuntimeException throw if the followings are not satisfied:
* 0 < sdtCompressionDeviation
* 0 <= sdtCompressionMinTimeInterval < sdtCompressionMaxTimeInterval <= Long.MAX_VALUE
*/
private void checkSdtCompressionParams() {
if (sdtCompressionDeviation <= 0) {
throw new SpRuntimeException(
String.format("Compression Deviation should be positive! Actual value: %f. ", sdtCompressionDeviation));
}

if (sdtCompressionMinTimeInterval < 0) {
throw new SpRuntimeException(
String.format("Compression Minimum Time Interval should be >= 0! Actual value: %d. ",
sdtCompressionMinTimeInterval));
}

if (sdtCompressionMaxTimeInterval <= sdtCompressionMinTimeInterval) {
throw new SpRuntimeException(
String.format(
"Compression Minimum Time Interval should be < Compression Maximum Time Interval! "
+ "Actual: Compression Minimum Time Interval(%d), Compression Maximum Time Interval(%d). ",
sdtCompressionMinTimeInterval, sdtCompressionMaxTimeInterval));
}
}

@Override
public void onEvent(Event event, SpOutputCollector collector) throws SpRuntimeException {
final long timestamp = event.getFieldBySelector(sdtTimestampField).getAsPrimitive().getAsLong();
final double value = event.getFieldBySelector(sdtValueField).getAsPrimitive().getAsDouble();

if (sdtFilter.filter(timestamp, value, event)) {
sdtFilter.forward(collector);
}
}

@Override
public void onDetach() throws SpRuntimeException {
// nothing to do
}
}
Loading