Skip to content

Commit

Permalink
[#1296] Swinging Door Trending (SDT) Filter Processor (#1306)
Browse files Browse the repository at this point in the history
* [#1285] Documentation: Swinging Door Trending (SDT) Filter Processor

* [#1285] SwingingDoorTrendingFilterProcessor: declareModel, onInvocation and onDetach

* [#1285] SwingingDoorTrendingFilter implementation

* [#1285] add outputStrategy in declareModel of SwingingDoorTrendingFilterProcessor

* [#1285] reduce uncessary description in string.en

* [#1285] add Javadoc for SwingingDoorTrendingFilter#filter & SwingingDoorTrendingFilter#forward
  • Loading branch information
SteveYurongSu authored Feb 20, 2023
1 parent c8318c5 commit 62037ab
Show file tree
Hide file tree
Showing 7 changed files with 449 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import org.apache.streampipes.processors.filters.jvm.processor.numericaltextfilter.NumericalTextFilterProcessor;
import org.apache.streampipes.processors.filters.jvm.processor.projection.ProjectionProcessor;
import org.apache.streampipes.processors.filters.jvm.processor.schema.MergeBySchemaProcessor;
import org.apache.streampipes.processors.filters.jvm.processor.sdt.SwingingDoorTrendingFilterProcessor;
import org.apache.streampipes.processors.filters.jvm.processor.textfilter.TextFilterProcessor;
import org.apache.streampipes.processors.filters.jvm.processor.threshold.ThresholdDetectionProcessor;
import org.apache.streampipes.processors.filters.jvm.processor.throughputmon.ThroughputMonitorProcessor;
Expand Down Expand Up @@ -67,7 +68,8 @@ public SpServiceDefinition provideServiceDefinition() {
new ComposeProcessor(),
new NumericalTextFilterProcessor(),
new RateLimitProcessor(),
new MovingAverageProcessor())
new MovingAverageProcessor(),
new SwingingDoorTrendingFilterProcessor())
.registerMessagingFormats(
new JsonDataFormatFactory(),
new CborDataFormatFactory(),
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,182 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/

package org.apache.streampipes.processors.filters.jvm.processor.sdt;

import org.apache.streampipes.model.runtime.Event;
import org.apache.streampipes.wrapper.routing.SpOutputCollector;

public class SwingingDoorTrendingFilter {

/**
* the maximum absolute difference the user set if the data's value is within compressionDeviation, it
* will be compressed and discarded after compression, it will only store out of range (time, data) to form the trend
*/
private final double compressionDeviation;

/**
* the minimum time distance between two stored data points if current point time to the last
* stored point time distance <= compressionMinTimeInterval, current point will NOT be stored regardless of
* compression deviation
*/
private final long compressionMinTimeInterval;

/**
* the maximum time distance between two stored data points if current point time to the last
* stored point time distance >= compressionMaxTimeInterval, current point will be stored regardless of
* compression deviation
*/
private final long compressionMaxTimeInterval;

/**
* isFirstValue is true when the encoder takes the first point or reset() when cur point's
* distance to the last stored point's distance exceeds compressionMaxTimeInterval
*/
private boolean isFirstValue = true;

/**
* the maximum curUpperSlope between the lastStoredPoint to the current point upperDoor can only
* open up
*/
private double upperDoor = Integer.MIN_VALUE;

/**
* the minimum curLowerSlope between the lastStoredPoint to the current point lowerDoor can only
* open downward
*/
private double lowerDoor = Integer.MAX_VALUE;

/**
* the last read time and value if upperDoor >= lowerDoor meaning out of compressionDeviation range, will
* store lastReadPair
*/
private long lastReadTimestamp;
private double lastReadDouble;
private Event lastReadEvent;

/**
* the last stored time and value we compare current point against lastStoredPair
*/
private long lastStoredTimestamp;
private double lastStoredDouble;
private Event lastStoredEvent;

public SwingingDoorTrendingFilter(double compressionDeviation, long compressionMinTimeInterval,
long compressionMaxTimeInterval) {
this.compressionDeviation = compressionDeviation;
this.compressionMinTimeInterval = compressionMinTimeInterval;
this.compressionMaxTimeInterval = compressionMaxTimeInterval;
}

/**
* input a newly arrived event and output whether a new characteristic event is filtered
*
* @param time the timestamp extracted from the newly arrived event
* @param value the value extracted from the newly arrived event
* @param event the newly arrived event
* @return true if a new characteristic event is filtered
*/
public boolean filter(long time, double value, Event event) {
// store the first time and value pair
if (isFirstValue) {
isFirstValue = false;

lastReadTimestamp = time;
lastReadDouble = value;
lastReadEvent = event;

lastStoredTimestamp = time;
lastStoredDouble = value;
lastStoredEvent = event;

return true;
}

// if current point to the last stored point's time distance is within compressionMinTimeInterval,
// will not check two doors nor store any point within the compressionMinTimeInterval time range
if (time - lastStoredTimestamp <= compressionMinTimeInterval) {
return false;
}

// if current point to the last stored point's time distance is larger than compressionMaxTimeInterval,
// will reset two doors, and store current point;
if (time - lastStoredTimestamp >= compressionMaxTimeInterval) {
reset(time, value, event);
return true;
}

final double currentUpperSlope = (value - lastStoredDouble - compressionDeviation) / (time - lastStoredTimestamp);
if (currentUpperSlope > upperDoor) {
upperDoor = currentUpperSlope;
}

final double currentLowerSlope = (value - lastStoredDouble + compressionDeviation) / (time - lastStoredTimestamp);
if (currentLowerSlope < lowerDoor) {
lowerDoor = currentLowerSlope;
}

// current point to the lastStoredPair's value exceeds compDev, will store lastReadPair and
// update two doors
if (upperDoor >= lowerDoor) {
lastStoredTimestamp = lastReadTimestamp;
lastStoredDouble = lastReadDouble;
lastStoredEvent = lastReadEvent;

upperDoor = (value - lastStoredDouble - compressionDeviation) / (time - lastStoredTimestamp);
lowerDoor = (value - lastStoredDouble + compressionDeviation) / (time - lastStoredTimestamp);

lastReadDouble = value;
lastReadTimestamp = time;
lastReadEvent = event;

return true;
}

lastReadDouble = value;
lastReadTimestamp = time;
lastReadEvent = event;

return false;
}

/**
* output the recently filtered characteristic event to the collector
*
* @param collector the event collector
*/
public void forward(SpOutputCollector collector) {
collector.collect(lastStoredEvent);
}

/**
* if current point to the last stored point's time distance >= compressionMaxTimeInterval, will store current
* point and reset upperDoor and lowerDoor
*
* @param time current time
* @param value current value
* @param event current event
*/
private void reset(long time, double value, Event event) {
lastStoredTimestamp = time;
lastStoredDouble = value;
lastStoredEvent = event;

upperDoor = Integer.MIN_VALUE;
lowerDoor = Integer.MAX_VALUE;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,133 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/

package org.apache.streampipes.processors.filters.jvm.processor.sdt;

import org.apache.streampipes.commons.exceptions.SpRuntimeException;
import org.apache.streampipes.model.DataProcessorType;
import org.apache.streampipes.model.graph.DataProcessorDescription;
import org.apache.streampipes.model.runtime.Event;
import org.apache.streampipes.model.schema.PropertyScope;
import org.apache.streampipes.sdk.builder.ProcessingElementBuilder;
import org.apache.streampipes.sdk.builder.StreamRequirementsBuilder;
import org.apache.streampipes.sdk.helpers.EpRequirements;
import org.apache.streampipes.sdk.helpers.Labels;
import org.apache.streampipes.sdk.helpers.Locales;
import org.apache.streampipes.sdk.helpers.OutputStrategies;
import org.apache.streampipes.sdk.utils.Assets;
import org.apache.streampipes.wrapper.context.EventProcessorRuntimeContext;
import org.apache.streampipes.wrapper.routing.SpOutputCollector;
import org.apache.streampipes.wrapper.standalone.ProcessorParams;
import org.apache.streampipes.wrapper.standalone.StreamPipesDataProcessor;

public class SwingingDoorTrendingFilterProcessor extends StreamPipesDataProcessor {

private static final String SDT_TIMESTAMP_FIELD_KEY = "sdt-timestamp-field";
private static final String SDT_VALUE_FIELD_KEY = "sdt-value-field";
private static final String SDT_COMPRESSION_DEVIATION_KEY = "sdt-compression-deviation";
private static final String SDT_COMPRESSION_MIN_INTERVAL_KEY = "sdt-compression-min-interval";
private static final String SDT_COMPRESSION_MAX_INTERVAL_KEY = "sdt-compression-max-interval";

private String sdtTimestampField;
private String sdtValueField;

private float sdtCompressionDeviation = Float.MAX_VALUE;
private long sdtCompressionMinTimeInterval = 0L;
private long sdtCompressionMaxTimeInterval = Long.MAX_VALUE;

private SwingingDoorTrendingFilter sdtFilter;

@Override
public DataProcessorDescription declareModel() {
return ProcessingElementBuilder.create("org.apache.streampipes.processors.filters.jvm.sdt")
.category(DataProcessorType.FILTER)
.withAssets(Assets.DOCUMENTATION, Assets.ICON)
.withLocales(Locales.EN)
.requiredStream(StreamRequirementsBuilder.create()
.requiredPropertyWithUnaryMapping(EpRequirements.timestampReq(),
Labels.withId(SDT_TIMESTAMP_FIELD_KEY), PropertyScope.NONE)
.requiredPropertyWithUnaryMapping(EpRequirements.numberReq(),
Labels.withId(SDT_VALUE_FIELD_KEY), PropertyScope.NONE)
.build())
.requiredFloatParameter(Labels.withId(SDT_COMPRESSION_DEVIATION_KEY))
.requiredLongParameter(Labels.withId(SDT_COMPRESSION_MIN_INTERVAL_KEY), 0L)
.requiredLongParameter(Labels.withId(SDT_COMPRESSION_MAX_INTERVAL_KEY), Long.MAX_VALUE)
.outputStrategy(OutputStrategies.keep())
.build();
}

@Override
public void onInvocation(ProcessorParams parameters, SpOutputCollector spOutputCollector,
EventProcessorRuntimeContext runtimeContext) throws SpRuntimeException {
// extract field names on stream
sdtTimestampField = parameters.extractor().mappingPropertyValue(SDT_TIMESTAMP_FIELD_KEY);
sdtValueField = parameters.extractor().mappingPropertyValue(SDT_VALUE_FIELD_KEY);

// extract & check sdt compression params
sdtCompressionDeviation = parameters.extractor().singleValueParameter(SDT_COMPRESSION_DEVIATION_KEY, Float.class);
sdtCompressionMinTimeInterval =
parameters.extractor().singleValueParameter(SDT_COMPRESSION_MIN_INTERVAL_KEY, Long.class);
sdtCompressionMaxTimeInterval =
parameters.extractor().singleValueParameter(SDT_COMPRESSION_MAX_INTERVAL_KEY, Long.class);
checkSdtCompressionParams();

sdtFilter = new SwingingDoorTrendingFilter(sdtCompressionDeviation, sdtCompressionMinTimeInterval,
sdtCompressionMaxTimeInterval);
}

/**
* @throws SpRuntimeException throw if the followings are not satisfied:
* 0 < sdtCompressionDeviation
* 0 <= sdtCompressionMinTimeInterval < sdtCompressionMaxTimeInterval <= Long.MAX_VALUE
*/
private void checkSdtCompressionParams() {
if (sdtCompressionDeviation <= 0) {
throw new SpRuntimeException(
String.format("Compression Deviation should be positive! Actual value: %f. ", sdtCompressionDeviation));
}

if (sdtCompressionMinTimeInterval < 0) {
throw new SpRuntimeException(
String.format("Compression Minimum Time Interval should be >= 0! Actual value: %d. ",
sdtCompressionMinTimeInterval));
}

if (sdtCompressionMaxTimeInterval <= sdtCompressionMinTimeInterval) {
throw new SpRuntimeException(
String.format(
"Compression Minimum Time Interval should be < Compression Maximum Time Interval! "
+ "Actual: Compression Minimum Time Interval(%d), Compression Maximum Time Interval(%d). ",
sdtCompressionMinTimeInterval, sdtCompressionMaxTimeInterval));
}
}

@Override
public void onEvent(Event event, SpOutputCollector collector) throws SpRuntimeException {
final long timestamp = event.getFieldBySelector(sdtTimestampField).getAsPrimitive().getAsLong();
final double value = event.getFieldBySelector(sdtValueField).getAsPrimitive().getAsDouble();

if (sdtFilter.filter(timestamp, value, event)) {
sdtFilter.forward(collector);
}
}

@Override
public void onDetach() throws SpRuntimeException {
// nothing to do
}
}
Loading

0 comments on commit 62037ab

Please sign in to comment.