From 27a9fd66181aae705a11049150e2bbc744be15f1 Mon Sep 17 00:00:00 2001 From: Zhang Ji <125540670@qq.com> Date: Thu, 23 Jan 2025 18:57:15 +0800 Subject: [PATCH] =?UTF-8?q?feat(=E5=9C=BA=E6=99=AF=E8=81=94=E5=8A=A8):=20?= =?UTF-8?q?=E6=B7=BB=E5=8A=A0=E8=AE=BE=E5=A4=87=E6=95=B0=E6=8D=AE=E6=89=A7?= =?UTF-8?q?=E8=A1=8C=E5=8A=A8=E4=BD=9C=EF=BC=8C=E6=89=A9=E5=B1=95=E6=95=B0?= =?UTF-8?q?=E7=BB=84=E6=9D=A1=E4=BB=B6=EF=BC=8C=E4=BC=98=E5=8C=96=E5=9B=BD?= =?UTF-8?q?=E9=99=85=E5=8C=96=20(#605)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- jetlinks-components/common-component/pom.xml | 5 + .../configuration/CommonConfiguration.java | 27 ++ .../DataTypeJSONDeserializer.java | 26 ++ .../aggregation/AggregationSupport.java | 35 ++ .../InternalAggregationSupports.java | 82 ++++ .../reactorql/function/FunctionInfo.java | 25 ++ .../reactorql/function/FunctionSupport.java | 72 +++ .../function/InternalFunctionSupport.java | 70 +++ .../reactorql/impl/ComplexExistsFunction.java | 424 ++++++++++++++++++ .../reactorql/term/ExistsTermSupport.java | 73 +++ .../reactorql/term/TermTypeSupport.java | 13 + .../community/reactorql/term/TermTypes.java | 4 + .../community/utils/ReactorUtils.java | 8 + .../relation/utils/VariableSource.java | 19 + .../commons/TermsConditionEvaluator.java | 58 ++- .../engine/commons/things/ThingInfoSpec.java | 265 +++++++++++ .../RuleEngineConfiguration.java | 9 + .../DeviceDataTaskExecutorProvider.java | 144 ++++++ .../subscriber/providers/AlarmProvider.java | 47 +- ...rProvider.java => AlarmSceneProvider.java} | 8 +- .../rule/engine/alarm/AlarmTarget.java | 4 + .../alarm/AlarmTaskExecutorProvider.java | 1 - .../alarm/CustomAlarmTargetSupplier.java | 6 - .../rule/engine/alarm/DeviceAlarmTarget.java | 9 +- .../rule/engine/alarm/ProductAlarmTarget.java | 8 +- ...AlarmTarget.java => SceneAlarmTarget.java} | 12 +- .../AlarmTargetConfiguration.java | 26 ++ .../rule/engine/scene/DeviceOperation.java | 83 +--- .../engine/scene/SceneActionProvider.java | 47 +- .../rule/engine/scene/SceneRule.java | 4 +- .../engine/scene/SceneTriggerProvider.java | 6 + .../rule/engine/scene/SceneUtils.java | 67 ++- .../community/rule/engine/scene/Trigger.java | 9 +- .../internal/actions/AlarmActionProvider.java | 7 + .../actions/DeviceDataActionProvider.java | 99 ++++ .../internal/triggers/DeviceTrigger.java | 6 +- .../triggers/DeviceTriggerProvider.java | 5 + .../rule/engine/scene/term/TermColumn.java | 4 + .../rule/engine/scene/value/TermValue.java | 28 +- .../rule/engine/utils/TermColumnUtils.java | 178 ++++++++ .../engine/web/AlarmConfigController.java | 13 +- .../engine/web/AlarmHistoryController.java | 26 ++ .../engine/web/AlarmRecordController.java | 65 ++- .../rule/engine/web/SceneController.java | 41 ++ .../request/ArrayChildTermColumnRequest.java | 24 + .../web/response/AlarmTargetTypeInfo.java | 13 + .../engine/web/response/SceneActionInfo.java | 55 +++ .../web/response/SceneAggregationInfo.java | 38 ++ .../engine/web/response/SceneTriggerInfo.java | 44 ++ ...ot.autoconfigure.AutoConfiguration.imports | 3 +- .../messages_en.properties | 66 ++- .../messages_zh.properties | 64 ++- pom.xml | 2 +- 53 files changed, 2332 insertions(+), 145 deletions(-) create mode 100644 jetlinks-components/common-component/src/main/java/org/jetlinks/community/configuration/DataTypeJSONDeserializer.java create mode 100644 jetlinks-components/common-component/src/main/java/org/jetlinks/community/reactorql/aggregation/AggregationSupport.java create mode 100644 jetlinks-components/common-component/src/main/java/org/jetlinks/community/reactorql/aggregation/InternalAggregationSupports.java create mode 100644 jetlinks-components/common-component/src/main/java/org/jetlinks/community/reactorql/function/FunctionInfo.java create mode 100644 jetlinks-components/common-component/src/main/java/org/jetlinks/community/reactorql/function/FunctionSupport.java create mode 100644 jetlinks-components/common-component/src/main/java/org/jetlinks/community/reactorql/function/InternalFunctionSupport.java create mode 100644 jetlinks-components/common-component/src/main/java/org/jetlinks/community/reactorql/impl/ComplexExistsFunction.java create mode 100644 jetlinks-components/common-component/src/main/java/org/jetlinks/community/reactorql/term/ExistsTermSupport.java create mode 100644 jetlinks-components/rule-engine-component/src/main/java/org/jetlinks/community/rule/engine/commons/things/ThingInfoSpec.java create mode 100644 jetlinks-components/rule-engine-component/src/main/java/org/jetlinks/community/rule/engine/executor/device/DeviceDataTaskExecutorProvider.java rename jetlinks-manager/notify-manager/src/main/java/org/jetlinks/community/notify/manager/subscriber/providers/{AlarmOtherProvider.java => AlarmSceneProvider.java} (80%) rename jetlinks-manager/rule-engine-manager/src/main/java/org/jetlinks/community/rule/engine/alarm/{OtherAlarmTarget.java => SceneAlarmTarget.java} (56%) create mode 100644 jetlinks-manager/rule-engine-manager/src/main/java/org/jetlinks/community/rule/engine/configuration/AlarmTargetConfiguration.java create mode 100644 jetlinks-manager/rule-engine-manager/src/main/java/org/jetlinks/community/rule/engine/scene/internal/actions/DeviceDataActionProvider.java create mode 100644 jetlinks-manager/rule-engine-manager/src/main/java/org/jetlinks/community/rule/engine/utils/TermColumnUtils.java create mode 100644 jetlinks-manager/rule-engine-manager/src/main/java/org/jetlinks/community/rule/engine/web/request/ArrayChildTermColumnRequest.java create mode 100644 jetlinks-manager/rule-engine-manager/src/main/java/org/jetlinks/community/rule/engine/web/response/SceneActionInfo.java create mode 100644 jetlinks-manager/rule-engine-manager/src/main/java/org/jetlinks/community/rule/engine/web/response/SceneAggregationInfo.java create mode 100644 jetlinks-manager/rule-engine-manager/src/main/java/org/jetlinks/community/rule/engine/web/response/SceneTriggerInfo.java diff --git a/jetlinks-components/common-component/pom.xml b/jetlinks-components/common-component/pom.xml index 4688ccc47..1244c2949 100644 --- a/jetlinks-components/common-component/pom.xml +++ b/jetlinks-components/common-component/pom.xml @@ -19,6 +19,11 @@ ${jetlinks.version} + + org.jetlinks + jetlinks-supports + + org.hswebframework.web hsweb-authorization-api diff --git a/jetlinks-components/common-component/src/main/java/org/jetlinks/community/configuration/CommonConfiguration.java b/jetlinks-components/common-component/src/main/java/org/jetlinks/community/configuration/CommonConfiguration.java index 4fc77fbe1..b125c2ab6 100644 --- a/jetlinks-components/common-component/src/main/java/org/jetlinks/community/configuration/CommonConfiguration.java +++ b/jetlinks-components/common-component/src/main/java/org/jetlinks/community/configuration/CommonConfiguration.java @@ -20,6 +20,8 @@ import org.jetlinks.community.config.SimpleConfigManager; import org.jetlinks.community.config.entity.ConfigEntity; import org.jetlinks.community.dictionary.DictionaryJsonDeserializer; +import org.jetlinks.community.reactorql.aggregation.InternalAggregationSupports; +import org.jetlinks.community.reactorql.function.InternalFunctionSupport; import org.jetlinks.community.reference.DataReferenceManager; import org.jetlinks.community.reference.DataReferenceProvider; import org.jetlinks.community.reference.DefaultDataReferenceManager; @@ -32,10 +34,12 @@ import org.jetlinks.community.utils.TimeUtils; import org.jetlinks.core.event.EventBus; import org.jetlinks.core.metadata.DataType; +import org.jetlinks.core.metadata.types.DataTypes; import org.jetlinks.core.rpc.RpcManager; import org.jetlinks.reactor.ql.feature.Feature; import org.jetlinks.reactor.ql.supports.DefaultReactorQLMetadata; import org.jetlinks.reactor.ql.utils.CastUtils; +import org.jetlinks.supports.official.JetLinksDataTypeCodecs; import org.springframework.beans.BeansException; import org.springframework.beans.factory.ObjectProvider; import org.springframework.beans.factory.config.BeanPostProcessor; @@ -47,6 +51,7 @@ import org.springframework.context.annotation.Configuration; import org.springframework.data.redis.core.ReactiveRedisOperations; import org.springframework.http.MediaType; +import org.springframework.util.StringUtils; import org.springframework.util.unit.DataSize; import reactor.core.Exceptions; import reactor.core.publisher.Hooks; @@ -59,6 +64,7 @@ import java.time.format.DateTimeFormatter; import java.time.temporal.ChronoUnit; import java.util.Date; +import java.util.Map; @Configuration @SuppressWarnings("all") @@ -66,6 +72,9 @@ public class CommonConfiguration { static { + InternalAggregationSupports.register(); + InternalFunctionSupport.register(); + BeanUtilsBean.getInstance().getConvertUtils().register(new Converter() { @Override public T convert(Class aClass, Object o) { @@ -149,6 +158,23 @@ public T convert(Class type, Object value) { } }, EnumDict.class); + BeanUtilsBean.getInstance().getConvertUtils().register(new Converter() { + @Override + @Generated + public T convert(Class type, Object value) { + if (value instanceof Map) { + Map map = ((Map) value); + String typeId = (String) map.get("type"); + if (StringUtils.isEmpty(typeId)) { + return null; + } + return (T) JetLinksDataTypeCodecs.decode(DataTypes.lookup(typeId).get(), map); + } + return null; + + } + }, DataType.class); + //捕获jvm错误,防止Flux被挂起 Hooks.onOperatorError((err, val) -> { if (Exceptions.isJvmFatal(err)) { @@ -180,6 +206,7 @@ public Object postProcessAfterInitialization(@Nonnull Object bean, @Nonnull Stri @Bean public Jackson2ObjectMapperBuilderCustomizer jackson2ObjectMapperBuilderCustomizer(){ return builder->{ + builder.deserializerByType(DataType.class, new DataTypeJSONDeserializer()); builder.deserializerByType(Date.class,new SmartDateDeserializer()); builder.deserializerByType(EnumDict.class, new DictionaryJsonDeserializer()); }; diff --git a/jetlinks-components/common-component/src/main/java/org/jetlinks/community/configuration/DataTypeJSONDeserializer.java b/jetlinks-components/common-component/src/main/java/org/jetlinks/community/configuration/DataTypeJSONDeserializer.java new file mode 100644 index 000000000..741f693c3 --- /dev/null +++ b/jetlinks-components/common-component/src/main/java/org/jetlinks/community/configuration/DataTypeJSONDeserializer.java @@ -0,0 +1,26 @@ +package org.jetlinks.community.configuration; + +import com.fasterxml.jackson.core.JsonParser; +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.DeserializationContext; +import com.fasterxml.jackson.databind.JsonDeserializer; +import org.apache.commons.beanutils.BeanUtilsBean; +import org.jetlinks.core.metadata.DataType; + +import java.io.IOException; +import java.util.Map; + +/** + * + * @author zhangji 2025/1/23 + * @since 2.3 + */ +public class DataTypeJSONDeserializer extends JsonDeserializer { + @Override + public DataType deserialize(JsonParser parser, DeserializationContext ctxt) throws IOException, JsonProcessingException { + + Map map= ctxt.readValue(parser, Map.class); + + return (DataType) BeanUtilsBean.getInstance().getConvertUtils().convert(map, DataType.class); + } +} diff --git a/jetlinks-components/common-component/src/main/java/org/jetlinks/community/reactorql/aggregation/AggregationSupport.java b/jetlinks-components/common-component/src/main/java/org/jetlinks/community/reactorql/aggregation/AggregationSupport.java new file mode 100644 index 000000000..74b22c466 --- /dev/null +++ b/jetlinks-components/common-component/src/main/java/org/jetlinks/community/reactorql/aggregation/AggregationSupport.java @@ -0,0 +1,35 @@ +package org.jetlinks.community.reactorql.aggregation; + +import org.hswebframework.ezorm.rdb.operator.builder.fragments.SqlFragments; +import org.hswebframework.ezorm.rdb.operator.dml.FunctionColumn; +import org.jetlinks.community.spi.Provider; +import org.reactivestreams.Publisher; +import reactor.core.publisher.Mono; + +import java.util.function.Function; + +/** + * 聚合函数支持. + * + * @author zhangji 2025/1/22 + * @since 2.3 + */ +public interface AggregationSupport extends Function, Mono> { + + Provider supports = Provider.create(AggregationSupport.class); + + String getId(); + + String getName(); + + SqlFragments createSql(FunctionColumn column); + + + static AggregationSupport getNow(String id) { + return AggregationSupport.supports + .get(id.toUpperCase()) + .orElseGet(() -> AggregationSupport.supports + .get(id.toLowerCase()) + .orElseGet(() -> AggregationSupport.supports.getNow(id))); + } +} diff --git a/jetlinks-components/common-component/src/main/java/org/jetlinks/community/reactorql/aggregation/InternalAggregationSupports.java b/jetlinks-components/common-component/src/main/java/org/jetlinks/community/reactorql/aggregation/InternalAggregationSupports.java new file mode 100644 index 000000000..50e648005 --- /dev/null +++ b/jetlinks-components/common-component/src/main/java/org/jetlinks/community/reactorql/aggregation/InternalAggregationSupports.java @@ -0,0 +1,82 @@ +package org.jetlinks.community.reactorql.aggregation; + +import lombok.AllArgsConstructor; +import lombok.Getter; +import org.hswebframework.ezorm.rdb.operator.builder.fragments.BatchSqlFragments; +import org.hswebframework.ezorm.rdb.operator.builder.fragments.SqlFragments; +import org.hswebframework.ezorm.rdb.operator.dml.FunctionColumn; +import org.jetlinks.reactor.ql.supports.agg.MapAggFeature; +import org.jetlinks.reactor.ql.utils.CastUtils; +import org.reactivestreams.Publisher; +import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; +import reactor.math.MathFlux; + +import java.util.Comparator; +import java.util.function.Function; + +import static org.jetlinks.reactor.ql.supports.DefaultReactorQLMetadata.addGlobal; + +/** + * + * @author zhangji 2025/1/22 + * @since 2.3 + */ +@AllArgsConstructor +public enum InternalAggregationSupports implements AggregationSupport { + COUNT("总数", Flux::count, 0), + //去重计数 + DISTINCT_COUNT("总数(去重)", flux -> flux.distinct().count(), 0) { + @Override + public SqlFragments createSql(FunctionColumn column) { + return new BatchSqlFragments().addSql("count(distinct ", column.getColumn(), ")"); + } + }, + MIN("最小值", + numberFlux -> MathFlux.min(numberFlux.map(CastUtils::castNumber), Comparator.comparing(Number::doubleValue)), null), + MAX("最大值", numberFlux -> MathFlux.max(numberFlux.map(CastUtils::castNumber), Comparator.comparing(Number::doubleValue)), null), + AVG("平均值", numberFlux -> MathFlux.averageDouble(numberFlux.map(CastUtils::castNumber), Number::doubleValue), null), + SUM("总和", numberFlux -> MathFlux.sumDouble(numberFlux.map(CastUtils::castNumber), Number::doubleValue), 0), + + FIRST("第一个值", numberFlux -> numberFlux.take(1).singleOrEmpty(), null), + LAST("最后一个值", numberFlux -> numberFlux.takeLast(1).singleOrEmpty(), null), + +// MEDIAN("中位数", numberFlux -> Mono.empty(), null),//中位数 +// SPREAD("极差", numberFlux -> Mono.empty(), null),//差值 +// STDDEV("标准差", numberFlux -> Mono.empty(), null),//标准差 + ; + + static { + for (InternalAggregationSupports value : values()) { + addGlobal(new MapAggFeature(value.getId(), value::apply)); + AggregationSupport.supports.register(value.getId(), value); + } + } + + public static void register(){ + + } + + @Getter + private final String name; + + private final Function, Mono> computer; + @Getter + private final Object defaultValue; + + @Override + public SqlFragments createSql(FunctionColumn column) { + return new BatchSqlFragments() + .addSql(name() + "(").addSql(column.getColumn()).addSql(")"); + } + + @Override + public String getId() { + return name(); + } + + @Override + public Mono apply(Publisher publisher) { + return computer.apply(Flux.from(publisher)); + } +} diff --git a/jetlinks-components/common-component/src/main/java/org/jetlinks/community/reactorql/function/FunctionInfo.java b/jetlinks-components/common-component/src/main/java/org/jetlinks/community/reactorql/function/FunctionInfo.java new file mode 100644 index 000000000..3768f920a --- /dev/null +++ b/jetlinks-components/common-component/src/main/java/org/jetlinks/community/reactorql/function/FunctionInfo.java @@ -0,0 +1,25 @@ +package org.jetlinks.community.reactorql.function; + +import lombok.Getter; +import lombok.Setter; +import org.jetlinks.core.metadata.DataType; +import org.jetlinks.core.metadata.PropertyMetadata; + +import java.util.List; + +/** + * + * @author zhangji 2025/1/22 + * @since 2.3 + */ +@Getter +@Setter +public class FunctionInfo { + private String id; + + private String name; + + private DataType outputType; + + private List args; +} diff --git a/jetlinks-components/common-component/src/main/java/org/jetlinks/community/reactorql/function/FunctionSupport.java b/jetlinks-components/common-component/src/main/java/org/jetlinks/community/reactorql/function/FunctionSupport.java new file mode 100644 index 000000000..87b1b1c3a --- /dev/null +++ b/jetlinks-components/common-component/src/main/java/org/jetlinks/community/reactorql/function/FunctionSupport.java @@ -0,0 +1,72 @@ +package org.jetlinks.community.reactorql.function; + +import org.hswebframework.ezorm.rdb.operator.builder.fragments.SqlFragments; +import org.jetlinks.core.metadata.DataType; +import org.jetlinks.community.spi.Provider; + +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; + +/** + * 函数支持,用于定义在可以在ReactorQL中使用的函数. + * + * @author zhangji 2025/1/22 + * @since 2.3 + */ +public interface FunctionSupport { + + Provider supports = Provider.create(FunctionSupport.class); + + String getId(); + + String getName(); + + /** + * 是否支持列的数据类型 + * + * @param type 数据类型 + * @return 是否支持 + */ + boolean isSupported(DataType type); + + /** + * 获取输出数据类型 + * + * @return 输出数据类型 + */ + DataType getOutputType(); + + /** + * 创建SQL函数片段 + * + * @param column 列名 + * @param args 参数 + * @return SQL函数片段 + */ + SqlFragments createSql(String column, Map args); + + /** + * 查找支持的函数 + * + * @param type 数据类型 + * @return 函数信息 + */ + static List lookup(DataType type) { + return supports + .getAll() + .stream() + .filter(support -> support.isSupported(type)) + .map(FunctionSupport::toInfo) + .collect(Collectors.toList()); + } + + + default FunctionInfo toInfo() { + FunctionInfo info = new FunctionInfo(); + info.setId(getId()); + info.setOutputType(getOutputType()); + info.setName(getName()); + return info; + } +} diff --git a/jetlinks-components/common-component/src/main/java/org/jetlinks/community/reactorql/function/InternalFunctionSupport.java b/jetlinks-components/common-component/src/main/java/org/jetlinks/community/reactorql/function/InternalFunctionSupport.java new file mode 100644 index 000000000..afbd27989 --- /dev/null +++ b/jetlinks-components/common-component/src/main/java/org/jetlinks/community/reactorql/function/InternalFunctionSupport.java @@ -0,0 +1,70 @@ +package org.jetlinks.community.reactorql.function; + +import com.google.common.collect.Sets; +import lombok.AllArgsConstructor; +import org.hswebframework.ezorm.rdb.operator.builder.fragments.SqlFragments; +import org.jetlinks.core.metadata.DataType; +import org.jetlinks.core.metadata.types.ArrayType; +import org.jetlinks.core.metadata.types.LongType; + +import java.util.Collections; +import java.util.Map; +import java.util.Set; + +/** + * + * @author zhangji 2025/1/22 + * @since 2.3 + */ +@AllArgsConstructor +public enum InternalFunctionSupport implements FunctionSupport { + + array_len("集合长度", LongType.GLOBAL, ArrayType.ID); + + private final String name; + private final DataType outputType; + private final Set supportTypes; + + static { + for (InternalFunctionSupport value : values()) { + InternalFunctionSupport.supports.register(value.getId(), value); + } + } + + public static void register(){ + + } + + InternalFunctionSupport(String name, DataType outputType, String... supportTypes) { + this.name = name; + this.outputType = outputType; + this.supportTypes = Collections.unmodifiableSet( + Sets.newHashSet(supportTypes) + ); + } + + @Override + public String getId() { + return name(); + } + + @Override + public String getName() { + return name; + } + + @Override + public boolean isSupported(DataType type) { + return supportTypes.contains(type.getId()); + } + + @Override + public DataType getOutputType() { + return outputType; + } + + @Override + public SqlFragments createSql(String column, Map args) { + return SqlFragments.of(getId() + "(", column, ")"); + } +} diff --git a/jetlinks-components/common-component/src/main/java/org/jetlinks/community/reactorql/impl/ComplexExistsFunction.java b/jetlinks-components/common-component/src/main/java/org/jetlinks/community/reactorql/impl/ComplexExistsFunction.java new file mode 100644 index 000000000..8ca39ec05 --- /dev/null +++ b/jetlinks-components/common-component/src/main/java/org/jetlinks/community/reactorql/impl/ComplexExistsFunction.java @@ -0,0 +1,424 @@ +package org.jetlinks.community.reactorql.impl; + +import com.alibaba.fastjson.JSONArray; +import com.alibaba.fastjson.JSONObject; +import com.google.common.collect.Maps; +import lombok.AllArgsConstructor; +import lombok.Getter; +import lombok.Setter; +import net.sf.jsqlparser.expression.*; +import net.sf.jsqlparser.expression.operators.relational.ExpressionList; +import net.sf.jsqlparser.statement.select.Select; +import net.sf.jsqlparser.statement.select.SubSelect; +import org.apache.commons.collections4.CollectionUtils; +import org.hswebframework.ezorm.core.param.Term; +import org.hswebframework.ezorm.rdb.executor.SqlRequest; +import org.hswebframework.ezorm.rdb.executor.SqlRequests; +import org.hswebframework.ezorm.rdb.operator.builder.fragments.BatchSqlFragments; +import org.hswebframework.ezorm.rdb.operator.builder.fragments.EmptySqlFragments; +import org.hswebframework.ezorm.rdb.operator.builder.fragments.SqlFragments; +import org.hswebframework.ezorm.rdb.operator.dml.FunctionColumn; +import org.hswebframework.ezorm.rdb.operator.dml.FunctionTerm; +import org.hswebframework.web.api.crud.entity.TermExpressionParser; +import org.hswebframework.web.bean.FastBeanCopier; +import org.jetlinks.community.reactorql.aggregation.AggregationSupport; +import org.jetlinks.community.utils.ConverterUtils; +import org.jetlinks.community.utils.ReactorUtils; +import org.jetlinks.core.metadata.Jsonable; +import org.jetlinks.reactor.ql.DefaultReactorQLContext; +import org.jetlinks.reactor.ql.ReactorQL; +import org.jetlinks.reactor.ql.ReactorQLMetadata; +import org.jetlinks.reactor.ql.ReactorQLRecord; +import org.jetlinks.reactor.ql.feature.FeatureId; +import org.jetlinks.reactor.ql.feature.ValueMapFeature; +import org.jetlinks.reactor.ql.supports.DefaultReactorQLMetadata; +import org.jetlinks.reactor.ql.utils.CastUtils; +import org.reactivestreams.Publisher; +import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.function.BiFunction; +import java.util.function.Consumer; +import java.util.function.Function; +import java.util.stream.Collectors; + +/** + * + * @author zhangji 2025/1/22 + * @since 2.3 + */ +public class ComplexExistsFunction implements ValueMapFeature { + + //集合中的元素 + public static final String COL_ELEMENT = "_element"; + //集合自身 + public static final String COL_SELF = "_self"; + //原始行 + public static final String COL_ROW = "_row"; + + public static final ComplexExistsFunction INSTANCE = new ComplexExistsFunction(); + + public static final String function = "complex_exists"; + + static { + DefaultReactorQLMetadata.addGlobal(INSTANCE); + } + + public static void register() { + + } + + public static ExistsSpec createExistsSpec(Object val) { + if (val instanceof ExistsSpec) { + return ((ExistsSpec) val); + } + if (val instanceof Map) { + ExistsSpec spec = new ExistsSpec(); + spec.fromJson(new JSONObject((Map) val)); + return spec; + } + if (val instanceof String) { + ExistsSpec spec = new ExistsSpec(); + spec.setFilter(TermExpressionParser.parse(val.toString())); + return spec; + } + return FastBeanCopier.copy(val, new ExistsSpec()); + } + + private static ExistsProcessor createExprProcessor(Expression expr) { + String sql; + if (expr instanceof Select) { + sql = expr.toString(); + } else if (expr instanceof SubSelect) { + sql = ((SubSelect) expr).getSelectBody().toString(); + } else if (expr instanceof BinaryExpression) { + sql = "select 1 from t where " + expr; + } else { + throw new UnsupportedOperationException("不支持的表达式:" + expr); + } + SqlRequest request = SqlRequests.of(sql); + ReactorQL ql = ReactorQL + .builder() + .sql(request.getSql()) + .build(); + return new ReactorQLProcessor(request, ql); + } + + private static ExistsProcessor createExprProcessor(String str) { + if (str.startsWith("select") || str.startsWith("SELECT")) { + SqlRequest request = SqlRequests.of(str); + ReactorQL ql = ReactorQL + .builder() + .sql(request.getSql()) + .build(); + return new ReactorQLProcessor(request, ql); + } + ExistsSpec spec = new ExistsSpec(); + spec.setFilter(TermExpressionParser.parse(str)); + return spec.compile(); + } + + @Override + public Function> createMapper(Expression expression, ReactorQLMetadata metadata) { + + net.sf.jsqlparser.expression.Function func = ((net.sf.jsqlparser.expression.Function) expression); + ExpressionList args = func.getParameters(); + if (args == null || args.getExpressions() == null || args.getExpressions().size() < 2) { + throw new UnsupportedOperationException("complex_exists函数参数错误"); + } + Function> processorMapper; + + List>> mappers = args + .getExpressions() + .stream() + .skip(1) + .map(expr -> ValueMapFeature.createMapperNow(expr, metadata)) + .collect(Collectors.toList()); + + Expression firstExpr = args.getExpressions().get(0); + //以字符串来定义 complex_exists('name is test') + if (firstExpr instanceof StringValue) { + Mono processor = Mono.just(createExprProcessor(((StringValue) firstExpr).getValue())); + processorMapper = record -> processor; + } else if (firstExpr instanceof NumericBind) { + int argIndex = ((NumericBind) firstExpr).getBindId() - 1; + processorMapper = record -> Mono.justOrEmpty( + record.getContext().getParameter(argIndex).map(this::transformProcessor) + ); + } else if (firstExpr instanceof JdbcParameter) { + int argIndex = ((JdbcParameter) firstExpr).getIndex() - 1; + processorMapper = record -> Mono.justOrEmpty( + record.getContext().getParameter(argIndex).map(this::transformProcessor) + ); + } + // complex_exists(select 1 from dual where ,arr) + else if (firstExpr instanceof JdbcNamedParameter) { + String name = ((JdbcNamedParameter) firstExpr).getName(); + processorMapper = record -> Mono.justOrEmpty( + transformProcessor(record.getContext().getParameter(name)) + ); + } else { + Mono processor = Mono.just(createExprProcessor(firstExpr)); + processorMapper = record -> processor; + } + + return record -> + processorMapper + .apply(record) + .flatMap(processor -> { + if (mappers.size() == 1) { + return processor.apply( + record, + mappers.get(0).apply(record)); + } + return processor.apply( + record, + Flux.fromIterable(mappers) + .flatMap(mapper -> mapper.apply(record)) + ); + }); + } + + @Override + public String getId() { + return FeatureId.ValueMap.of(function).getId(); + } + + private ExistsProcessor transformProcessor(Object value) { + if (value instanceof ExistsProcessor) { + return (ExistsProcessor) value; + } + if (value instanceof ExistsSpec) { + return ((ExistsSpec) value).compile(); + } + return null; + } + + public interface ExistsProcessor extends BiFunction, Mono> { + @Override + Mono apply(ReactorQLRecord record, Publisher publisher); + + Mono apply(ReactorQLRecord record, List list); + } + + @AllArgsConstructor + static class ReactorQLProcessor implements ExistsProcessor { + private final SqlRequest request; + private final ReactorQL ql; + + public Mono apply(ReactorQLRecord record, List list) { + + Flux data = Flux + .fromIterable(list) + .map(v -> { + Map _data = Maps.newHashMapWithExpectedSize(3); + _data.put(COL_ROW,record.getRecord()); + _data.put(COL_ELEMENT, v); + _data.put(COL_SELF, list); + + return _data; + }); + + DefaultReactorQLContext ctx = new DefaultReactorQLContext((t) -> data); + for (Object parameter : request.getParameters()) { + ctx.bind(parameter); + } + + return ql.start(ctx) + .hasElements(); + } + + @Override + public Mono apply(ReactorQLRecord record, Publisher publisher) { + if (publisher instanceof Mono) { + return ((Mono) publisher) + .map(ConverterUtils::convertToList) + .flatMap(list -> apply(record, list)); + } + return Flux + .from(publisher) + .as(CastUtils::flatStream) + .collectList() + .flatMap(list -> apply(record, list)); + + } + + @Override + public String toString() { + return request.toNativeSql(); + } + } + + private static FunctionTerm convertFunctionTerm(Object obj) { + if (obj instanceof FunctionTerm) { + return (FunctionTerm) obj; + } + if (obj instanceof Map) { + return convertFunctionTerm(new JSONObject((Map) obj)); + } + throw new UnsupportedOperationException("不支持的类型:" + obj); + } + + private static FunctionTerm convertFunctionTerm(JSONObject obj) { + FunctionTerm term = new FunctionTerm(); + FastBeanCopier.copy(obj, term, "terms"); + JSONArray terms = obj.getJSONArray("terms"); + if (terms != null) { + terms + .forEach(o -> { + term.addTerm(convertFunctionTerm((JSONObject) o)); + }); + } + return term; + } + + @Getter + @Setter + public static class ExistsSpec implements Jsonable { + //过滤 + private List filter; + + //聚合 + private List aggregation; + + @Override + public void fromJson(JSONObject json) { + FastBeanCopier.copy(json, this, "aggregation"); + JSONArray aggregation = json.getJSONArray("aggregation"); + if (aggregation != null) { + this.aggregation = aggregation + .stream() + .map(ComplexExistsFunction::convertFunctionTerm) + .collect(Collectors.toList()); + } + } + + //todo 其他简便配置的方式?如: 任意满足,全部满足等 + + public void walkTerms(Consumer consumer) { + if (filter != null) { + filter.forEach(consumer); + } + if (aggregation != null) { + aggregation.forEach(consumer); + } + } + + private void applyAggregation(BatchSqlFragments fragments, + AtomicInteger count, + FunctionTerm agg, + Map distinct, + Map aliasMapping) { + // 大小写都支持 + AggregationSupport support = AggregationSupport.getNow(agg.getFunction()); + + FunctionColumn col = new FunctionColumn(); + col.setFunction(agg.getFunction()); + col.setColumn("this['" + agg.getColumn() + "']"); + col.setOpts(agg.getOpts() == null ? null : Maps.transformValues(agg.getOpts(), Object.class::cast)); + + SqlFragments frg = support.createSql(col); + String sqlStr = frg.toRequest().toNativeSql(); + String alias = distinct.get(frg.toRequest().toNativeSql()); + + if (alias == null) { + alias = "_agg_" + count.incrementAndGet(); + distinct.put(sqlStr, alias); + aliasMapping.put(agg, alias); + if (count.get() > 1) { + fragments.addSql(","); + } + fragments.add(frg).addSql(alias); + } + + if (CollectionUtils.isNotEmpty(agg.getTerms())) { + for (Term term : agg.getTerms()) { + if (term instanceof FunctionTerm) { + applyAggregation(fragments, + count, + ((FunctionTerm) term), + distinct, + aliasMapping); + } + } + } + + } + + private List createHavingTerm(List aggregation, + Map aliasMapping) { + List terms = new ArrayList<>(aggregation.size()); + for (Term functionTerm : aggregation) { + Term term = new Term(); + term.setColumn(aliasMapping.getOrDefault(functionTerm, functionTerm.getColumn())); + term.setTermType(functionTerm.getTermType()); + term.setValue(functionTerm.getValue()); + term.setOptions(functionTerm.getOptions()); + term.setType(Term.Type.and); + if (CollectionUtils.isNotEmpty(functionTerm.getTerms())) { + term.setTerms(createHavingTerm(functionTerm.getTerms(), aliasMapping)); + } + terms.add(term); + } + return terms; + } + + public ExistsProcessor compile() { + + SqlFragments cols; + SqlFragments having; + + if (CollectionUtils.isNotEmpty(aggregation)) { + Map distinct = new HashMap<>(); + Map aliasMapping = new HashMap<>(); + + BatchSqlFragments cols_ = new BatchSqlFragments(); + AtomicInteger count = new AtomicInteger(1); + count.incrementAndGet(); + cols_.addSql(COL_SELF,",",COL_ELEMENT); + for (FunctionTerm functionTerm : aggregation) { + applyAggregation(cols_, count, functionTerm, distinct, aliasMapping); + } + cols = cols_; + List havingTerms = createHavingTerm(aggregation, aliasMapping); + having = ReactorUtils.createFilterSql(havingTerms); + } else { + cols = SqlFragments.ONE; + having = EmptySqlFragments.INSTANCE; + } + + SqlFragments where = ReactorUtils.createFilterSql(filter); + + BatchSqlFragments fragments = new BatchSqlFragments(); + + if (having.isNotEmpty()) { + fragments.addSql("select 1 from ("); + } + + fragments.addSql("select").addFragments(cols).addSql("from t"); + if (where.isNotEmpty()) { + fragments.addSql("where").addFragments(where); + } + + if (having.isNotEmpty()) { + fragments.add(SqlFragments.RIGHT_BRACKET) + .add(SqlFragments.WHERE) + .addFragments(having); + } + SqlRequest request = fragments.toRequest(); + + ReactorQL ql = ReactorQL.builder() + .sql(request.getSql()) + .build(); + + return new ReactorQLProcessor(request, ql); + } + } + + +} diff --git a/jetlinks-components/common-component/src/main/java/org/jetlinks/community/reactorql/term/ExistsTermSupport.java b/jetlinks-components/common-component/src/main/java/org/jetlinks/community/reactorql/term/ExistsTermSupport.java new file mode 100644 index 000000000..cdb52da69 --- /dev/null +++ b/jetlinks-components/common-component/src/main/java/org/jetlinks/community/reactorql/term/ExistsTermSupport.java @@ -0,0 +1,73 @@ +package org.jetlinks.community.reactorql.term; + +import org.hswebframework.ezorm.core.param.Term; +import org.hswebframework.ezorm.rdb.operator.builder.fragments.BatchSqlFragments; +import org.hswebframework.ezorm.rdb.operator.builder.fragments.SqlFragments; +import org.jetlinks.community.reactorql.impl.ComplexExistsFunction; +import org.jetlinks.core.metadata.DataType; +import org.jetlinks.core.metadata.types.ArrayType; + +import java.util.function.BiFunction; + +/** + * @author zhangji 2025/1/23 + * @since 2.3 + */ +public class ExistsTermSupport implements TermTypeSupport { + static { + ComplexExistsFunction.register(); + } + + @Override + public String getType() { + return "complex_exists"; + } + + @Override + public String getName() { + return "满足"; + } + + @Override + public boolean isSupported(DataType type) { + return type instanceof ArrayType; + } + + @Override + public Term refactorTerm(String tableName, + Term term, + BiFunction refactor) { + Term t = refactor.apply(tableName, term); + ComplexExistsFunction.ExistsSpec existsSpec = ComplexExistsFunction.createExistsSpec(t.getValue()); + + existsSpec.walkTerms(__term -> { + + String col = __term.getColumn(); + //使用 _row 获取原始行数据 + refactor.apply(ComplexExistsFunction.COL_ROW, __term); + //由原始条件指定的列名为准,如: _element.this 、_element.num + __term.setColumn(col); + + }); + + t.setValue(existsSpec); + + return t; + } + + @Override + public SqlFragments createSql(String column, Object value, Term term) { + + ComplexExistsFunction.ExistsSpec existsSpec = ComplexExistsFunction.createExistsSpec(value); + + BatchSqlFragments fragments = new BatchSqlFragments(); + + fragments + .addSql("complex_exists(?,", column, ")") + .addParameter(existsSpec.compile()); + + + return fragments; + } + +} diff --git a/jetlinks-components/common-component/src/main/java/org/jetlinks/community/reactorql/term/TermTypeSupport.java b/jetlinks-components/common-component/src/main/java/org/jetlinks/community/reactorql/term/TermTypeSupport.java index 4b901110d..5acc1c723 100644 --- a/jetlinks-components/common-component/src/main/java/org/jetlinks/community/reactorql/term/TermTypeSupport.java +++ b/jetlinks-components/common-component/src/main/java/org/jetlinks/community/reactorql/term/TermTypeSupport.java @@ -9,6 +9,7 @@ import java.util.Collections; import java.util.concurrent.TimeUnit; +import java.util.function.BiFunction; public interface TermTypeSupport { @@ -40,6 +41,18 @@ public interface TermTypeSupport { */ SqlFragments createSql(String column, Object value, Term term); + /** + * 重构条件 + * + * @param tableName 表名 + * @param term 条件 + * @param refactor 重构函数 + * @return 重构后的条件 + */ + default Term refactorTerm(String tableName, Term term, BiFunction refactor){ + return refactor.apply(tableName,term); + } + /** * 判断是否已经过时,过时的条件应当不可选择. * diff --git a/jetlinks-components/common-component/src/main/java/org/jetlinks/community/reactorql/term/TermTypes.java b/jetlinks-components/common-component/src/main/java/org/jetlinks/community/reactorql/term/TermTypes.java index c92fde853..be7fbece9 100644 --- a/jetlinks-components/common-component/src/main/java/org/jetlinks/community/reactorql/term/TermTypes.java +++ b/jetlinks-components/common-component/src/main/java/org/jetlinks/community/reactorql/term/TermTypes.java @@ -18,6 +18,7 @@ public class TermTypes { for (FixedTermTypeSupport value : FixedTermTypeSupport.values()) { register(value); } + register(new ExistsTermSupport()); } public static void register(TermTypeSupport support){ @@ -35,6 +36,9 @@ public static List lookup(DataType dataType) { } public static Optional lookupSupport(String type) { + if (type == null) { + return Optional.empty(); + } return Optional.ofNullable(supports.get(type)); } } diff --git a/jetlinks-components/common-component/src/main/java/org/jetlinks/community/utils/ReactorUtils.java b/jetlinks-components/common-component/src/main/java/org/jetlinks/community/utils/ReactorUtils.java index 1bf3261e1..bff779643 100644 --- a/jetlinks-components/common-component/src/main/java/org/jetlinks/community/utils/ReactorUtils.java +++ b/jetlinks-components/common-component/src/main/java/org/jetlinks/community/utils/ReactorUtils.java @@ -3,6 +3,7 @@ import org.hswebframework.ezorm.core.param.Term; import org.hswebframework.ezorm.rdb.executor.SqlRequest; import org.hswebframework.ezorm.rdb.operator.builder.fragments.AbstractTermsFragmentBuilder; +import org.hswebframework.ezorm.rdb.operator.builder.fragments.EmptySqlFragments; import org.hswebframework.ezorm.rdb.operator.builder.fragments.SqlFragments; import org.hswebframework.web.bean.FastBeanCopier; import org.jetlinks.community.reactorql.term.FixedTermTypeSupport; @@ -132,6 +133,13 @@ public static Function> createFilter(List terms, return createFilter(terms, converter, (arg, data) -> arg); } + public static SqlFragments createFilterSql(List terms) { + if (CollectionUtils.isEmpty(terms)) { + return EmptySqlFragments.INSTANCE; + } + return termBuilder.createTermFragments(null, terms); + } + @SuppressWarnings("all") public static Function> createFilter(List terms, Function> converter, diff --git a/jetlinks-components/relation-component/src/main/java/org/jetlinks/community/relation/utils/VariableSource.java b/jetlinks-components/relation-component/src/main/java/org/jetlinks/community/relation/utils/VariableSource.java index b0ff48758..52487bc9a 100644 --- a/jetlinks-components/relation-component/src/main/java/org/jetlinks/community/relation/utils/VariableSource.java +++ b/jetlinks-components/relation-component/src/main/java/org/jetlinks/community/relation/utils/VariableSource.java @@ -84,6 +84,25 @@ public static VariableSource relation(VariableObjectSpec spec) { return variableSource; } + public static Object resolveStatic(Object value, Map ctx) { + if (value instanceof VariableSource) { + return ((VariableSource) value).resolveStatic(ctx); + } + if (value instanceof Map) { + Map mapVal = ((Map) value); + if (!mapVal.containsKey("$noVariable")) { + Object sourceName = mapVal.get("source"); + if (sourceName != null && VariableSource.Source.of(String.valueOf(sourceName)).isPresent()) { + VariableSource source = FastBeanCopier.copy(mapVal, new VariableSource()); + if (source.getSource() != null) { + return source.resolveStatic(ctx); + } + } + } + } + return value; + } + public static VariableSource of(Object value) { if (value instanceof VariableSource) { return ((VariableSource) value); diff --git a/jetlinks-components/rule-engine-component/src/main/java/org/jetlinks/community/rule/engine/commons/TermsConditionEvaluator.java b/jetlinks-components/rule-engine-component/src/main/java/org/jetlinks/community/rule/engine/commons/TermsConditionEvaluator.java index 4fbbc61b6..a169de553 100644 --- a/jetlinks-components/rule-engine-component/src/main/java/org/jetlinks/community/rule/engine/commons/TermsConditionEvaluator.java +++ b/jetlinks-components/rule-engine-component/src/main/java/org/jetlinks/community/rule/engine/commons/TermsConditionEvaluator.java @@ -1,8 +1,12 @@ package org.jetlinks.community.rule.engine.commons; +import com.alibaba.fastjson.JSONObject; +import org.apache.commons.collections4.CollectionUtils; import org.hswebframework.ezorm.core.param.Term; +import org.hswebframework.ezorm.rdb.operator.builder.fragments.NativeSql; import org.hswebframework.web.api.crud.entity.TermExpressionParser; import org.hswebframework.web.bean.FastBeanCopier; +import org.hswebframework.web.exception.I18nSupportException; import org.jetlinks.community.relation.utils.VariableSource; import org.jetlinks.community.utils.ReactorUtils; import org.jetlinks.reactor.ql.utils.CastUtils; @@ -14,6 +18,7 @@ import java.util.Collections; import java.util.List; +import java.util.Map; import java.util.function.Function; import java.util.stream.Collectors; @@ -35,14 +40,20 @@ * } * * } + *

+ * 建议使用{@link TermsConditionEvaluator#createCondition(List)}来创建用户输入的条件 * * @author zhouhao + * @see TermsConditionEvaluator#createCondition(List) * @since 2.0 */ public class TermsConditionEvaluator implements ConditionEvaluatorStrategy { public static final String TYPE = "terms"; public static Condition createCondition(List terms) { + //校验条件是否合法 + validateUnsafeTerm(terms); + Condition condition = new Condition(); condition.setType(TYPE); condition.setConfiguration(Collections.singletonMap("terms", terms)); @@ -59,6 +70,49 @@ public boolean evaluate(Condition condition, RuleData context) { return false; } + public static void validateUnsafeTerm(List term) { + if (CollectionUtils.isNotEmpty(term)) { + term.forEach(TermsConditionEvaluator::validateUnsafeTerm); + } + } + + public static void validateUnsafeTerm(Term term) { + Object val = term.getValue(); + //如果值是Map,并且包含sql字段,说明是有用户输入了sql参数. + //程序构造应当使用NativeSql + if (val instanceof Map) { + @SuppressWarnings("all") + JSONObject map = new JSONObject((Map) val); + if (map.containsKey("sql")) { + throw new I18nSupportException("error.illegal_term_value"); + } + } + validateUnsafeTerm(term.getTerms()); + } + + private Term convertTermValue(Term term) { + Object val = term.getValue(); + if (val instanceof Map) { + @SuppressWarnings("all") + JSONObject map = new JSONObject((Map) val); + //nativeSql,这里存在缺陷? 用户在特殊情况下可以传入任意sql(不使用 createCondition 方法创建的条件) + //但是此sql使用reactorQL执行,不会直接执行到数据库,所以不会有sql注入风险. + if (map.containsKey("sql")) { + String sql = map.getString("sql"); + Object[] params = map.containsKey("parameters") ? map + .getJSONArray("parameters") + .toArray() : new Object[0]; + term.setValue(NativeSql.of(sql, params)); + } + } + if (CollectionUtils.isNotEmpty(term.getTerms())) { + for (Term termTerm : term.getTerms()) { + convertTermValue(termTerm); + } + } + return term; + } + @Override public Function> prepare(Condition condition) { List terms = condition @@ -66,7 +120,7 @@ public Function> prepare(Condition condition) { .map(val -> CastUtils .castArray(val) .stream() - .map(obj -> FastBeanCopier.copy(obj, new Term())) + .map(obj -> convertTermValue(FastBeanCopier.copy(obj, new Term()))) .collect(Collectors.toList())) .orElseGet(() -> condition .getConfig("where") @@ -77,6 +131,6 @@ public Function> prepare(Condition condition) { return ReactorUtils.createFilter(terms, RuleDataHelper::toContextMap, - (arg, map) -> VariableSource.of(arg).resolveStatic(map)); + VariableSource::resolveStatic); } } diff --git a/jetlinks-components/rule-engine-component/src/main/java/org/jetlinks/community/rule/engine/commons/things/ThingInfoSpec.java b/jetlinks-components/rule-engine-component/src/main/java/org/jetlinks/community/rule/engine/commons/things/ThingInfoSpec.java new file mode 100644 index 000000000..bde283315 --- /dev/null +++ b/jetlinks-components/rule-engine-component/src/main/java/org/jetlinks/community/rule/engine/commons/things/ThingInfoSpec.java @@ -0,0 +1,265 @@ +package org.jetlinks.community.rule.engine.commons.things; + +import com.google.common.collect.Maps; +import lombok.Getter; +import lombok.Setter; +import org.apache.commons.collections4.CollectionUtils; +import org.hswebframework.web.bean.FastBeanCopier; +import org.hswebframework.web.i18n.LocaleUtils; +import org.jetlinks.core.Values; +import org.jetlinks.core.metadata.EventMetadata; +import org.jetlinks.core.metadata.PropertyMetadata; +import org.jetlinks.core.metadata.types.DateTimeType; +import org.jetlinks.core.metadata.types.ObjectType; +import org.jetlinks.core.metadata.types.StringType; +import org.jetlinks.core.things.*; +import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; + +import java.util.HashMap; +import java.util.Map; +import java.util.Set; + +/** + * + * @author zhangji 2025/1/22 + * @since 2.3 + */ +@Getter +@Setter +public class ThingInfoSpec { + + private Set configs; + private Set properties; + private Set tags; + private Set events; + + public Mono> read(Thing thing, ThingsDataManager dataManager, long time) { + Map data = Maps.newConcurrentMap(); + + Mono task = Mono.empty(); + if (CollectionUtils.isNotEmpty(properties)) { + task = this + .readProperties(dataManager, thing, time) + .doOnNext(v -> data.put("properties", v)); + } + + if (CollectionUtils.isNotEmpty(configs)) { + task = task + .then(readConfigs(thing)) + .doOnNext(v -> data.put("configs", v)); + } + + if (CollectionUtils.isNotEmpty(tags)) { + task = task + .then(readTags(dataManager, thing, time)) + .doOnNext(v -> data.put("tags", v)); + } + if (CollectionUtils.isNotEmpty(events)) { + task = task + .then(readEvent(dataManager, thing, time)) + .doOnNext(v -> data.put("events", v)); + } + + return task.thenReturn(data); + + } + + private Mono> readConfigs(Thing thing) { + + return thing + .getSelfConfigs(configs) + .map(Values::getAllValues); + } + + private Mono> readEvent(ThingsDataManager dataManager, Thing thing, long baseTime) { + if (CollectionUtils.isEmpty(events)) { + return Mono.empty(); + } + int size = tags.size(); + if (size == 1) { + return dataManager + .getLastEvent(thing.getType().getId(), thing.getId(), events.iterator().next(), baseTime) + .map(p -> converterEvent(p, Maps.newHashMapWithExpectedSize(1))); + } + Map tags = Maps.newConcurrentMap(); + return Flux + .fromIterable(this.tags) + .flatMap(property -> dataManager + .getLastEvent(thing.getType().getId(), thing.getId(), property, baseTime) + .map(p -> converterEvent(p, tags))) + .then(Mono.just(tags)); + } + + private Mono> readTags(ThingsDataManager dataManager, Thing thing, long baseTime) { + if (CollectionUtils.isEmpty(tags)) { + return Mono.empty(); + } + int size = tags.size(); + if (size == 1) { + return dataManager + .getLastTag(thing.getType().getId(), thing.getId(), tags.iterator().next(), baseTime) + .map(p -> converterTag(p, Maps.newHashMapWithExpectedSize(1))); + } + Map tags = Maps.newConcurrentMap(); + return Flux + .fromIterable(this.tags) + .flatMap(property -> dataManager + .getLastTag(thing.getType().getId(), thing.getId(), property, baseTime) + .map(p -> converterTag(p, tags))) + .then(Mono.just(tags)); + } + + private Mono> readProperties(ThingsDataManager dataManager, Thing thing, long baseTime) { + if (CollectionUtils.isEmpty(properties)) { + return Mono.empty(); + } + int size = properties.size(); + if (size == 1) { + return dataManager + .getLastProperty(thing.getType().getId(), thing.getId(), properties.iterator().next(), baseTime) + .map(p -> convertProperty(p, Maps.newHashMapWithExpectedSize(1))); + } + Map properties = Maps.newConcurrentMap(); + return Flux + .fromIterable(this.properties) + .flatMap(property -> dataManager + .getLastProperty(thing.getType().getId(), thing.getId(), property, baseTime) + .map(p -> convertProperty(p, properties))) + .then(Mono.just(properties)); + } + + private Map converterEvent(ThingEvent event, Map container) { + Map val = Maps.newHashMapWithExpectedSize(3); + val.put("data", event.getData()); + val.put("timestamp", event.getTimestamp()); + container.put(event.getEvent(), val); + return container; + } + + private Map converterTag(ThingTag tag, Map container) { + Map val = Maps.newHashMapWithExpectedSize(3); + val.put("value", tag.getValue()); + val.put("timestamp", tag.getTimestamp()); + container.put(tag.getTag(), val); + return container; + } + + private Map convertProperty(ThingProperty property, Map container) { + Map val = Maps.newHashMapWithExpectedSize(3); + val.put("value", property.getValue()); + val.put("timestamp", property.getTimestamp()); + val.put("state", property.getState()); + container.put(property.getProperty(), val); + return container; + } + + protected ObjectType createConfigType() { + return new ObjectType(); + } + + protected ObjectType createPropertiesType(ThingMetadata metadata) { + ObjectType type = new ObjectType(); + for (PropertyMetadata tag : metadata.getProperties()) { + type.addProperty( + tag.getId(), + tag.getName(), + new ObjectType() + .addProperty("value", + LocaleUtils.resolveMessage("message.thing.info.spec.property.value", "属性值"), + tag.getValueType()) + .addProperty("state", + LocaleUtils.resolveMessage("message.thing.info.spec.property.value", "属性状态"), + StringType.GLOBAL) + .addProperty("timestamp", + LocaleUtils.resolveMessage("message.thing.info.spec.property.timestamp", "时间戳"), + DateTimeType.GLOBAL) + ); + } + return type; + } + + protected ObjectType createTagsType(ThingMetadata metadata) { + ObjectType type = new ObjectType(); + for (PropertyMetadata tag : metadata.getTags()) { + type.addProperty( + tag.getId(), + tag.getName(), + new ObjectType() + .addProperty("value", + LocaleUtils.resolveMessage("message.thing.info.spec.tag.value", "标签值"), + tag.getValueType()) + .addProperty("timestamp", + LocaleUtils.resolveMessage("message.thing.info.spec.tag.timestamp", "时间戳"), + DateTimeType.GLOBAL) + ); + } + return type; + } + + protected ObjectType createEventsType(ThingMetadata metadata) { + + ObjectType type = new ObjectType(); + for (EventMetadata event : metadata.getEvents()) { + type.addProperty( + event.getId(), + event.getName(), + new ObjectType() + .addProperty("data", + LocaleUtils.resolveMessage("message.thing.info.spec.event.data", "时间数据"), + event.getType()) + .addProperty("timestamp", + LocaleUtils.resolveMessage("message.thing.info.spec.event.timestamp", "时间戳"), + DateTimeType.GLOBAL) + ); + } + return type; + } + + + public ObjectType createOutputType(ThingMetadata metadata) { + ObjectType type = new ObjectType(); + //基本信息 + { + ObjectType _type = createConfigType(); + if (_type != null && CollectionUtils.isNotEmpty(_type.getProperties())) { + type.addProperty("configs", + LocaleUtils.resolveMessage("message.thing.info.spec.configs", "基本信息"), + _type); + } + } + //属性 + { + ObjectType _type = createPropertiesType(metadata); + if (_type != null && CollectionUtils.isNotEmpty(_type.getProperties())) { + type.addProperty("properties", + LocaleUtils.resolveMessage("message.thing.info.spec.properties", "物模型属性"), + _type); + } + } + //标签 + { + ObjectType _type = createTagsType(metadata); + if (_type != null && CollectionUtils.isNotEmpty(_type.getProperties())) { + type.addProperty("tags", + LocaleUtils.resolveMessage("message.thing.info.spec.tags", "标签信息"), + _type); + } + } + //事件 + { + ObjectType _type = createEventsType(metadata); + if (_type != null && CollectionUtils.isNotEmpty(_type.getProperties())) { + type.addProperty("events", + LocaleUtils.resolveMessage("message.thing.info.spec.events", "事件信息"), + _type); + } + } + + return type; + } + + public Map toMap() { + return FastBeanCopier.copy(this, new HashMap<>()); + } +} diff --git a/jetlinks-components/rule-engine-component/src/main/java/org/jetlinks/community/rule/engine/configuration/RuleEngineConfiguration.java b/jetlinks-components/rule-engine-component/src/main/java/org/jetlinks/community/rule/engine/configuration/RuleEngineConfiguration.java index bf020018e..444448084 100644 --- a/jetlinks-components/rule-engine-component/src/main/java/org/jetlinks/community/rule/engine/configuration/RuleEngineConfiguration.java +++ b/jetlinks-components/rule-engine-component/src/main/java/org/jetlinks/community/rule/engine/configuration/RuleEngineConfiguration.java @@ -2,7 +2,10 @@ import lombok.extern.slf4j.Slf4j; import org.jetlinks.community.rule.engine.commons.TermsConditionEvaluator; +import org.jetlinks.community.rule.engine.executor.DeviceSelectorBuilder; +import org.jetlinks.community.rule.engine.executor.device.DeviceDataTaskExecutorProvider; import org.jetlinks.core.event.EventBus; +import org.jetlinks.core.things.ThingsDataManager; import org.jetlinks.rule.engine.api.RuleEngine; import org.jetlinks.rule.engine.api.scheduler.Scheduler; import org.jetlinks.rule.engine.api.task.ConditionEvaluator; @@ -104,4 +107,10 @@ public RuleEngine defaultRuleEngine(Scheduler scheduler) { return new DefaultRuleEngine(scheduler); } + @Bean + public DeviceDataTaskExecutorProvider deviceDataTaskExecutorProvider(ThingsDataManager dataManager, + DeviceSelectorBuilder selectorBuilder) { + return new DeviceDataTaskExecutorProvider(dataManager, selectorBuilder); + } + } diff --git a/jetlinks-components/rule-engine-component/src/main/java/org/jetlinks/community/rule/engine/executor/device/DeviceDataTaskExecutorProvider.java b/jetlinks-components/rule-engine-component/src/main/java/org/jetlinks/community/rule/engine/executor/device/DeviceDataTaskExecutorProvider.java new file mode 100644 index 000000000..efeccc4b5 --- /dev/null +++ b/jetlinks-components/rule-engine-component/src/main/java/org/jetlinks/community/rule/engine/executor/device/DeviceDataTaskExecutorProvider.java @@ -0,0 +1,144 @@ +package org.jetlinks.community.rule.engine.executor.device; + +import io.swagger.v3.oas.annotations.media.Schema; +import lombok.AllArgsConstructor; +import lombok.Getter; +import lombok.Setter; +import org.hswebframework.web.bean.FastBeanCopier; +import org.hswebframework.web.i18n.LocaleUtils; +import org.jetlinks.community.PropertyConstants; +import org.jetlinks.community.relation.utils.VariableSource; +import org.jetlinks.community.rule.engine.commons.things.ThingInfoSpec; +import org.jetlinks.community.rule.engine.executor.DeviceSelector; +import org.jetlinks.community.rule.engine.executor.DeviceSelectorBuilder; +import org.jetlinks.core.device.DeviceState; +import org.jetlinks.core.metadata.types.DateTimeType; +import org.jetlinks.core.metadata.types.EnumType; +import org.jetlinks.core.metadata.types.ObjectType; +import org.jetlinks.core.metadata.types.StringType; +import org.jetlinks.core.things.ThingsDataManager; +import org.jetlinks.reactor.ql.utils.CastUtils; +import org.jetlinks.rule.engine.api.RuleData; +import org.jetlinks.rule.engine.api.RuleDataHelper; +import org.jetlinks.rule.engine.api.task.ExecutionContext; +import org.jetlinks.rule.engine.api.task.TaskExecutor; +import org.jetlinks.rule.engine.api.task.TaskExecutorProvider; +import org.jetlinks.rule.engine.defaults.FunctionTaskExecutor; +import org.reactivestreams.Publisher; +import reactor.core.publisher.Mono; + +import java.util.Map; + +/** + * + * @author zhangji 2025/1/22 + * @since 2.3 + */ +@AllArgsConstructor +public class DeviceDataTaskExecutorProvider implements TaskExecutorProvider { + + public static final String ID = "device-info"; + + private final ThingsDataManager dataManager; + private final DeviceSelectorBuilder selectorBuilder; + + @Override + public String getExecutor() { + return ID; + } + + @Override + public Mono createTask(ExecutionContext context) { + return Mono.just(new ThingInfoTaskExecutor(context, dataManager, selectorBuilder)); + } + + static class ThingInfoTaskExecutor extends FunctionTaskExecutor { + + private Config config; + private DeviceSelector selector; + private final ThingsDataManager dataManager; + private final DeviceSelectorBuilder selectorBuilder; + + public ThingInfoTaskExecutor(ExecutionContext context, + ThingsDataManager dataManager, + DeviceSelectorBuilder selectorBuilder) { + super("获取设备信息", context); + this.dataManager = dataManager; + this.selectorBuilder = selectorBuilder; + reload(); + } + + @Override + public void reload() { + this.config = createConfig(); + this.selector = selectorBuilder.createSelector(config.getSelector()); + } + + @Override + protected Publisher apply(RuleData input) { + Map ctx = RuleDataHelper.toContextMap(input); + long ts = config.resolveTime(ctx); + return selector + .select(ctx) + .flatMap(device -> config + .read(device, dataManager, ts) + .map(output -> context.newRuleData(input, output))); + } + + private Config createConfig() { + return FastBeanCopier.copy(context.getJob().getConfiguration(), new Config()); + } + } + + + @Getter + @Setter + public static class Config extends ThingInfoSpec { + @Schema(title = "设备选择器") + private DeviceSelectorSpec selector; + + @Schema(title = "基准时间来源") + private VariableSource baseTime; + + @Override + protected ObjectType createConfigType() { + return new ObjectType() + .addProperty("state", + LocaleUtils.resolveMessage("message.thing.info.spec.config.state", "在线状态"), + new EnumType() + .addElement(EnumType.Element.of( + String.valueOf(DeviceState.online), + LocaleUtils.resolveMessage("message.thing.info.spec.config.state-online", "在线"))) + .addElement(EnumType.Element.of( + String.valueOf(DeviceState.offline), + LocaleUtils.resolveMessage("message.thing.info.spec.config.state-offline", "离线"))) + ) + .addProperty("onlineTime", + LocaleUtils.resolveMessage("message.thing.info.spec.config.onlineTime", "上一次在线时间"), + DateTimeType.GLOBAL + ) + .addProperty("offlineTime", + LocaleUtils.resolveMessage("message.thing.info.spec.config.offlineTime", "上一次离线时间"), + DateTimeType.GLOBAL + ) + .addProperty(PropertyConstants.deviceName.getKey(), + LocaleUtils.resolveMessage("message.thing.info.spec.config.deviceName", "设备名称"), + StringType.GLOBAL + ) + .addProperty(PropertyConstants.productName.getKey(), + LocaleUtils.resolveMessage("message.thing.info.spec.config.productName", "产品名称"), + StringType.GLOBAL + ); + } + + long resolveTime(Map ctx) { + if (baseTime != null) { + Number time = CastUtils.castNumber(baseTime.resolveStatic(ctx), (ignore) -> null); + if (time != null) { + return time.longValue(); + } + } + return System.currentTimeMillis(); + } + } +} diff --git a/jetlinks-manager/notify-manager/src/main/java/org/jetlinks/community/notify/manager/subscriber/providers/AlarmProvider.java b/jetlinks-manager/notify-manager/src/main/java/org/jetlinks/community/notify/manager/subscriber/providers/AlarmProvider.java index 29d360782..0568dae5a 100755 --- a/jetlinks-manager/notify-manager/src/main/java/org/jetlinks/community/notify/manager/subscriber/providers/AlarmProvider.java +++ b/jetlinks-manager/notify-manager/src/main/java/org/jetlinks/community/notify/manager/subscriber/providers/AlarmProvider.java @@ -17,10 +17,11 @@ import org.jetlinks.core.metadata.DefaultConfigMetadata; import org.jetlinks.core.metadata.PropertyMetadata; import org.jetlinks.core.metadata.SimplePropertyMetadata; +import org.jetlinks.core.metadata.types.IntType; +import org.jetlinks.core.metadata.types.LongType; import org.jetlinks.core.metadata.types.StringType; import org.jetlinks.core.utils.FluxUtils; import org.springframework.stereotype.Component; -import reactor.core.Disposable; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; @@ -28,7 +29,6 @@ import java.util.Locale; import java.util.Map; import java.util.Objects; -import java.util.function.Consumer; @Component @Slf4j @@ -67,7 +67,6 @@ public Mono createSubscriber(String id, Authentication authenticatio } - protected Mono doCreateSubscriber(String id, Authentication authentication, String topic) { @@ -76,21 +75,26 @@ protected Mono doCreateSubscriber(String id, .as(FluxUtils.distinct(Notify::getDataId, Duration.ofSeconds(10)))); } - protected String getAlarmId( Map config) { + protected String getAlarmId(Map config) { ValueObject configs = ValueObject.of(config); return configs.getString("alarmConfigId").orElse("*"); } private Flux createSubscribe(Locale locale, String id, - String[] topics) { - Subscription.Feature[] features = new Subscription.Feature[]{Subscription.Feature.local}; - return Flux - .defer(() -> this - .eventBus - .subscribe(Subscription.of("alarm:" + id, topics, features)) - .map(msg -> { - JSONObject json = msg.bodyToJson(); + String[] topic) { + return this + .eventBus + .subscribe( + Subscription + .builder() + .justLocal() + .subscriberId("alarm:" + id) + .topics(topic) + .build()) + .mapNotNull(payload -> { + try { + JSONObject json = payload.bodyToJson(); return Notify.of( getNotifyMessage(locale, json), //告警记录ID @@ -99,17 +103,20 @@ private Flux createSubscribe(Locale locale, "alarm", json ); - })); + } catch (Throwable error) { + log.warn("handle alarm notify error", error); + } + return null; + }); } - private static String getNotifyMessage(Locale locale, JSONObject json) { String message; TargetType targetType = TargetType.of(json.getString("targetType")); String targetName = json.getString("targetName"); String alarmName = json.getString("alarmConfigName"); - if (targetType == TargetType.other) { + if (targetType == TargetType.scene) { message = String.format("[%s]发生告警:[%s]!", targetName, alarmName); } else { message = String.format("%s[%s]发生告警:[%s]!", targetType.getText(), targetName, alarmName); @@ -123,7 +130,11 @@ public Flux getDetailProperties(Map config) { return Flux.just( SimplePropertyMetadata.of("targetType", "告警类型", StringType.GLOBAL), SimplePropertyMetadata.of("alarmConfigName", "告警名称", StringType.GLOBAL), - SimplePropertyMetadata.of("targetName", "目标名称", StringType.GLOBAL) + SimplePropertyMetadata.of("targetName", "告警目标名称", StringType.GLOBAL), + SimplePropertyMetadata.of("level", "告警级别", IntType.GLOBAL), + SimplePropertyMetadata.of("alarmTime", "告警时间", LongType.GLOBAL), + SimplePropertyMetadata.of("sourceType", "告警源类型", StringType.GLOBAL), + SimplePropertyMetadata.of("sourceName", "告警源名称", StringType.GLOBAL) ); } @@ -132,7 +143,7 @@ public Flux getDetailProperties(Map config) { enum TargetType { device("设备"), product("产品"), - other("其它"); + scene("场景"); private final String text; @@ -142,7 +153,7 @@ public static TargetType of(String name) { return value; } } - return TargetType.other; + return TargetType.scene; } } } diff --git a/jetlinks-manager/notify-manager/src/main/java/org/jetlinks/community/notify/manager/subscriber/providers/AlarmOtherProvider.java b/jetlinks-manager/notify-manager/src/main/java/org/jetlinks/community/notify/manager/subscriber/providers/AlarmSceneProvider.java similarity index 80% rename from jetlinks-manager/notify-manager/src/main/java/org/jetlinks/community/notify/manager/subscriber/providers/AlarmOtherProvider.java rename to jetlinks-manager/notify-manager/src/main/java/org/jetlinks/community/notify/manager/subscriber/providers/AlarmSceneProvider.java index 206766b3c..be80a91d4 100644 --- a/jetlinks-manager/notify-manager/src/main/java/org/jetlinks/community/notify/manager/subscriber/providers/AlarmOtherProvider.java +++ b/jetlinks-manager/notify-manager/src/main/java/org/jetlinks/community/notify/manager/subscriber/providers/AlarmSceneProvider.java @@ -12,9 +12,9 @@ @Component @Slf4j -public class AlarmOtherProvider extends AlarmProvider { +public class AlarmSceneProvider extends AlarmProvider { - public AlarmOtherProvider(EventBus eventBus) { + public AlarmSceneProvider(EventBus eventBus) { super(eventBus); } @@ -25,12 +25,12 @@ public String getId() { @Override public String getName() { - return "其他告警"; + return "场景告警"; } @Override public Mono createSubscriber(String id, Authentication authentication, Map config) { - String topic = Topics.alarm(TargetType.other.name(), "*", getAlarmId(config)); + String topic = Topics.alarm(TargetType.scene.name(), "*", getAlarmId(config)); return doCreateSubscriber(id, authentication, topic); } diff --git a/jetlinks-manager/rule-engine-manager/src/main/java/org/jetlinks/community/rule/engine/alarm/AlarmTarget.java b/jetlinks-manager/rule-engine-manager/src/main/java/org/jetlinks/community/rule/engine/alarm/AlarmTarget.java index 10ca5907c..3075c88cd 100644 --- a/jetlinks-manager/rule-engine-manager/src/main/java/org/jetlinks/community/rule/engine/alarm/AlarmTarget.java +++ b/jetlinks-manager/rule-engine-manager/src/main/java/org/jetlinks/community/rule/engine/alarm/AlarmTarget.java @@ -16,6 +16,10 @@ public interface AlarmTarget { Flux convert(AlarmData data); + default boolean isSupported(String trigger) { + return true; + }; + static AlarmTarget of(String type) { return AlarmTargetSupplier .get() diff --git a/jetlinks-manager/rule-engine-manager/src/main/java/org/jetlinks/community/rule/engine/alarm/AlarmTaskExecutorProvider.java b/jetlinks-manager/rule-engine-manager/src/main/java/org/jetlinks/community/rule/engine/alarm/AlarmTaskExecutorProvider.java index ba0177d69..9e3f52eaa 100644 --- a/jetlinks-manager/rule-engine-manager/src/main/java/org/jetlinks/community/rule/engine/alarm/AlarmTaskExecutorProvider.java +++ b/jetlinks-manager/rule-engine-manager/src/main/java/org/jetlinks/community/rule/engine/alarm/AlarmTaskExecutorProvider.java @@ -23,7 +23,6 @@ import java.util.function.Function; @AllArgsConstructor -@Component @Slf4j public class AlarmTaskExecutorProvider implements TaskExecutorProvider { public static final String executor = "alarm"; diff --git a/jetlinks-manager/rule-engine-manager/src/main/java/org/jetlinks/community/rule/engine/alarm/CustomAlarmTargetSupplier.java b/jetlinks-manager/rule-engine-manager/src/main/java/org/jetlinks/community/rule/engine/alarm/CustomAlarmTargetSupplier.java index 5b09ed2e1..acb848e1c 100644 --- a/jetlinks-manager/rule-engine-manager/src/main/java/org/jetlinks/community/rule/engine/alarm/CustomAlarmTargetSupplier.java +++ b/jetlinks-manager/rule-engine-manager/src/main/java/org/jetlinks/community/rule/engine/alarm/CustomAlarmTargetSupplier.java @@ -13,12 +13,6 @@ public class CustomAlarmTargetSupplier implements AlarmTargetSupplier { public static CustomAlarmTargetSupplier defaultSupplier = new CustomAlarmTargetSupplier(); - static { - register(new ProductAlarmTarget()); - register(new DeviceAlarmTarget()); - register(new OtherAlarmTarget()); - } - public static void register(AlarmTarget target) { targets.put(target.getType(), target); } diff --git a/jetlinks-manager/rule-engine-manager/src/main/java/org/jetlinks/community/rule/engine/alarm/DeviceAlarmTarget.java b/jetlinks-manager/rule-engine-manager/src/main/java/org/jetlinks/community/rule/engine/alarm/DeviceAlarmTarget.java index 12c166df2..b353fd671 100644 --- a/jetlinks-manager/rule-engine-manager/src/main/java/org/jetlinks/community/rule/engine/alarm/DeviceAlarmTarget.java +++ b/jetlinks-manager/rule-engine-manager/src/main/java/org/jetlinks/community/rule/engine/alarm/DeviceAlarmTarget.java @@ -1,5 +1,7 @@ package org.jetlinks.community.rule.engine.alarm; +import org.jetlinks.community.rule.engine.scene.internal.triggers.DeviceTriggerProvider; +import org.springframework.stereotype.Component; import reactor.core.publisher.Flux; import java.util.Map; @@ -7,7 +9,7 @@ /** * @author bestfeng */ - +@Component public class DeviceAlarmTarget extends AbstractAlarmTarget { @Override @@ -33,4 +35,9 @@ public Flux doConvert(AlarmData data) { return Flux.just(AlarmTargetInfo.of(deviceId, deviceName, getType())); } + @Override + public boolean isSupported(String trigger) { + return DeviceTriggerProvider.PROVIDER.equals(trigger); + }; + } diff --git a/jetlinks-manager/rule-engine-manager/src/main/java/org/jetlinks/community/rule/engine/alarm/ProductAlarmTarget.java b/jetlinks-manager/rule-engine-manager/src/main/java/org/jetlinks/community/rule/engine/alarm/ProductAlarmTarget.java index 2287b82db..0c8f7e0f8 100644 --- a/jetlinks-manager/rule-engine-manager/src/main/java/org/jetlinks/community/rule/engine/alarm/ProductAlarmTarget.java +++ b/jetlinks-manager/rule-engine-manager/src/main/java/org/jetlinks/community/rule/engine/alarm/ProductAlarmTarget.java @@ -1,5 +1,7 @@ package org.jetlinks.community.rule.engine.alarm; +import org.jetlinks.community.rule.engine.scene.internal.triggers.DeviceTriggerProvider; +import org.springframework.stereotype.Component; import reactor.core.publisher.Flux; import java.util.Map; @@ -7,7 +9,7 @@ /** * @author bestfeng */ - +@Component public class ProductAlarmTarget extends AbstractAlarmTarget { @Override @@ -29,5 +31,9 @@ public Flux doConvert(AlarmData data) { return Flux.just(AlarmTargetInfo.of(productId, productName, getType())); } + @Override + public boolean isSupported(String trigger) { + return DeviceTriggerProvider.PROVIDER.equals(trigger); + }; } diff --git a/jetlinks-manager/rule-engine-manager/src/main/java/org/jetlinks/community/rule/engine/alarm/OtherAlarmTarget.java b/jetlinks-manager/rule-engine-manager/src/main/java/org/jetlinks/community/rule/engine/alarm/SceneAlarmTarget.java similarity index 56% rename from jetlinks-manager/rule-engine-manager/src/main/java/org/jetlinks/community/rule/engine/alarm/OtherAlarmTarget.java rename to jetlinks-manager/rule-engine-manager/src/main/java/org/jetlinks/community/rule/engine/alarm/SceneAlarmTarget.java index 4e81cd2f9..3101b3bb2 100644 --- a/jetlinks-manager/rule-engine-manager/src/main/java/org/jetlinks/community/rule/engine/alarm/OtherAlarmTarget.java +++ b/jetlinks-manager/rule-engine-manager/src/main/java/org/jetlinks/community/rule/engine/alarm/SceneAlarmTarget.java @@ -1,21 +1,24 @@ package org.jetlinks.community.rule.engine.alarm; +import org.springframework.stereotype.Component; import reactor.core.publisher.Flux; /** * @author bestfeng */ +@Component +public class SceneAlarmTarget implements AlarmTarget { -public class OtherAlarmTarget implements AlarmTarget { + public static final String TYPE = "scene"; @Override public String getType() { - return "other"; + return TYPE; } @Override public String getName() { - return "其它"; + return "场景"; } @Override @@ -23,7 +26,8 @@ public Flux convert(AlarmData data) { return Flux.just(AlarmTargetInfo .of(data.getRuleId(), data.getRuleName(), - getType())); + getType()) + .withSource(TYPE, data.getRuleId(), data.getRuleName())); } } diff --git a/jetlinks-manager/rule-engine-manager/src/main/java/org/jetlinks/community/rule/engine/configuration/AlarmTargetConfiguration.java b/jetlinks-manager/rule-engine-manager/src/main/java/org/jetlinks/community/rule/engine/configuration/AlarmTargetConfiguration.java new file mode 100644 index 000000000..2a1ab621a --- /dev/null +++ b/jetlinks-manager/rule-engine-manager/src/main/java/org/jetlinks/community/rule/engine/configuration/AlarmTargetConfiguration.java @@ -0,0 +1,26 @@ +package org.jetlinks.community.rule.engine.configuration; + +import org.jetlinks.community.rule.engine.alarm.AlarmRuleHandler; +import org.jetlinks.community.rule.engine.alarm.AlarmTarget; +import org.jetlinks.community.rule.engine.alarm.AlarmTaskExecutorProvider; +import org.jetlinks.community.rule.engine.alarm.CustomAlarmTargetSupplier; +import org.springframework.beans.factory.ObjectProvider; +import org.springframework.boot.autoconfigure.AutoConfiguration; +import org.springframework.context.annotation.Bean; + +/** + * + * @author zhangji 2025/1/22 + * @since 2.3 + */ +@AutoConfiguration +public class AlarmTargetConfiguration { + + @Bean + public AlarmTaskExecutorProvider alarmTaskExecutorProvider(AlarmRuleHandler alarmHandler, + ObjectProvider targetProviders) { + targetProviders.forEach(CustomAlarmTargetSupplier::register); + return new AlarmTaskExecutorProvider(alarmHandler); + } + +} diff --git a/jetlinks-manager/rule-engine-manager/src/main/java/org/jetlinks/community/rule/engine/scene/DeviceOperation.java b/jetlinks-manager/rule-engine-manager/src/main/java/org/jetlinks/community/rule/engine/scene/DeviceOperation.java index ef3c84998..ab5d66581 100644 --- a/jetlinks-manager/rule-engine-manager/src/main/java/org/jetlinks/community/rule/engine/scene/DeviceOperation.java +++ b/jetlinks-manager/rule-engine-manager/src/main/java/org/jetlinks/community/rule/engine/scene/DeviceOperation.java @@ -5,6 +5,7 @@ import lombok.Getter; import lombok.Setter; import org.hswebframework.web.i18n.LocaleUtils; +import org.jetlinks.community.rule.engine.utils.TermColumnUtils; import org.jetlinks.core.message.function.FunctionInvokeMessage; import org.jetlinks.core.message.function.FunctionParameter; import org.jetlinks.core.message.property.ReadPropertyMessage; @@ -129,18 +130,21 @@ public List parseTermColumns(ThingMetadata metadata) { || operator == Operator.reportProperty || operator == Operator.writeProperty) { terms.addAll( - this.createTerm( + TermColumnUtils.createTerm( metadata.getProperties(), - (property, column) -> column.setChildren(createTermColumn("properties", property, true, PropertyValueType.values())), + (property, column) -> column.setChildren(TermColumnUtils.createTermColumn("properties", + property, + true, + PropertyValueType.values())), LocaleUtils.resolveMessage("message.device_metadata_property", "属性")) ); } else { //其他操作只能获取属性的上一次的值 terms.addAll( - this.createTerm( + TermColumnUtils.createTerm( metadata.getProperties(), (property, column) -> column.setChildren( - createTermColumn( + TermColumnUtils.createTermColumn( "properties", property, true, @@ -151,7 +155,7 @@ public List parseTermColumns(ThingMetadata metadata) { //事件上报 if (operator == Operator.reportEvent) { terms.addAll( - this.createTerm( + TermColumnUtils.createTerm( metadata.getEvent(eventId) .>map(event -> Collections .singletonList( @@ -160,13 +164,13 @@ public List parseTermColumns(ThingMetadata metadata) { event.getType()) )) .orElse(Collections.emptyList()), - (property, column) -> column.setChildren(createTermColumn("event", property, false)), + (property, column) -> column.setChildren(TermColumnUtils.createTermColumn("event", property, false)), LocaleUtils.resolveMessage("message.device_metadata_event", "事件"))); } //调用功能 if (operator == Operator.invokeFunction) { terms.addAll( - this.createTerm( + TermColumnUtils.createTerm( metadata.getFunction(functionId) //过滤掉异步功能和无返回值功能的参数输出 .filter(fun -> !fun.isAsync() && !(fun.getOutput() instanceof UnknownType)) @@ -176,76 +180,13 @@ public List parseTermColumns(ThingMetadata metadata) { meta.getOutput())) ) .orElse(Collections.emptyList()), - (property, column) -> column.setChildren(createTermColumn("function", property, false)), + (property, column) -> column.setChildren(TermColumnUtils.createTermColumn("function", property, false)), LocaleUtils.resolveMessage("message.device_metadata_function", "功能调用"))); } return TermColumn.refactorTermsInfo("properties", terms); } - private String resolveI18n(String key, String name) { - return LocaleUtils.resolveMessage(key, name); - } - - private List createTermColumn(String prefix, PropertyMetadata property, boolean last, PropertyValueType... valueTypes) { - //对象类型嵌套 - if (property.getValueType() instanceof ObjectType) { - ObjectType objType = ((ObjectType) property.getValueType()); - return this.createTerm( - objType.getProperties(), - (prop, column) -> { - String _prefix = prefix == null ? property.getId() : prefix + "." + property.getId(); - if (!last && !(prop.getValueType() instanceof ObjectType)) { - TermColumn term = createTermColumn(_prefix, prop, false, valueTypes).get(0); - column.setColumn(term.getColumn()); - column.setName(term.getName()); - column.setOptions(term.getOptions()); - column.withOthers(term.getOthers()); - } else { - column.setChildren(createTermColumn(_prefix, prop, last, valueTypes)); - } - }); - - } else { - if (!last) { - return Collections.singletonList( - TermColumn.of(SceneUtils.appendColumn(prefix, property.getId()), - property.getName(), property.getValueType()) - .withMetrics(property) - .withMetadataTrue() - ); - } - return Arrays - .stream(valueTypes) - .map(type -> TermColumn - .of(SceneUtils - .appendColumn(prefix, - property.getId(), - type.name()), - type.getKey(), - null, - type.dataType == null ? property.getValueType() : type.dataType) - .withMetrics(property) - .withMetadataTrue() - ) - .collect(Collectors.toList()); - - } - } - - private List createTerm(List metadataList, - BiConsumer consumer, - String... description) { - List columns = new ArrayList<>(metadataList.size()); - for (PropertyMetadata metadata : metadataList) { - TermColumn column = TermColumn.of(metadata); - column.setDescription(String.join("", description)); - consumer.accept(metadata, column); - columns.add(column.withMetadataTrue()); - } - return columns; - } - public void validate() { Assert.notNull(operator, "error.scene_rule_trigger_device_operation_cannot_be_null"); switch (operator) { diff --git a/jetlinks-manager/rule-engine-manager/src/main/java/org/jetlinks/community/rule/engine/scene/SceneActionProvider.java b/jetlinks-manager/rule-engine-manager/src/main/java/org/jetlinks/community/rule/engine/scene/SceneActionProvider.java index 12d434dfc..9ccf73e8f 100644 --- a/jetlinks-manager/rule-engine-manager/src/main/java/org/jetlinks/community/rule/engine/scene/SceneActionProvider.java +++ b/jetlinks-manager/rule-engine-manager/src/main/java/org/jetlinks/community/rule/engine/scene/SceneActionProvider.java @@ -1,29 +1,74 @@ package org.jetlinks.community.rule.engine.scene; +import org.jetlinks.core.utils.Reactors; import org.jetlinks.core.utils.SerializeUtils; import org.jetlinks.community.rule.engine.alarm.AlarmConstants; import org.jetlinks.community.terms.TermSpec; import org.jetlinks.rule.engine.api.model.RuleNodeModel; import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; import java.util.List; public interface SceneActionProvider { + /** + * 提供商标识 + * + * @return 提供商标识 + */ String getProvider(); + /** + * 创建一个新的配置 + * + * @return 配置 + */ C newConfig(); + /** + * 尝试从动作的变量中提取出需要动态获取的列信息 + * + * @param config 配置 + * @return 名称 + */ List parseColumns(C config); + /** + * 根据配置创建变量,用于获取此动作将要输出的变量 + * + * @param config 配置 + * @return 变量 + */ Flux createVariable(C config); + /** + * 应用配置到规则节点 + * + * @param config 配置 + * @param model 规则节点 + */ void applyRuleNode(C config, RuleNodeModel model); + /** + * 应用过滤条件描述到规则节点 + * + * @param node 规则节点 + * @param specs 过滤条件 + */ default void applyFilterSpec(RuleNodeModel node, List specs) { node.addConfiguration( AlarmConstants.ConfigKey.alarmFilterTermSpecs, SerializeUtils.convertToSafelySerializable(specs) - ); + ); + } + + /** + * 获取详细类型, + * 用于区分同一个类型支持的多个执行动作 + * @return 详细类型 + */ + default List getMode() { + return null; } } diff --git a/jetlinks-manager/rule-engine-manager/src/main/java/org/jetlinks/community/rule/engine/scene/SceneRule.java b/jetlinks-manager/rule-engine-manager/src/main/java/org/jetlinks/community/rule/engine/scene/SceneRule.java index 3242b28ef..5fed42b55 100644 --- a/jetlinks-manager/rule-engine-manager/src/main/java/org/jetlinks/community/rule/engine/scene/SceneRule.java +++ b/jetlinks-manager/rule-engine-manager/src/main/java/org/jetlinks/community/rule/engine/scene/SceneRule.java @@ -161,7 +161,7 @@ private Flux createSceneVariables(List columns) { )); List defaultVariables = createDefaultVariable(); - List termVar = SceneUtils.parseVariable(terms, columns); + List termVar = trigger.provider().parseVariable(terms, columns); List variables = new ArrayList<>(defaultVariables.size() + termVar.size()); variables.addAll(defaultVariables); @@ -561,7 +561,7 @@ public Mono toModel() { }); link.setCondition(TermsConditionEvaluator.createCondition( - trigger.refactorTerm("this", termList))); + trigger.refactorTerm("this", preAction.getTerms()))); } } else if (Objects.equals(trigger.getType(), ManualTriggerProvider.PROVIDER)) { diff --git a/jetlinks-manager/rule-engine-manager/src/main/java/org/jetlinks/community/rule/engine/scene/SceneTriggerProvider.java b/jetlinks-manager/rule-engine-manager/src/main/java/org/jetlinks/community/rule/engine/scene/SceneTriggerProvider.java index 6c8fe1a46..2b06af870 100644 --- a/jetlinks-manager/rule-engine-manager/src/main/java/org/jetlinks/community/rule/engine/scene/SceneTriggerProvider.java +++ b/jetlinks-manager/rule-engine-manager/src/main/java/org/jetlinks/community/rule/engine/scene/SceneTriggerProvider.java @@ -9,6 +9,7 @@ import org.jetlinks.community.rule.engine.commons.impl.SimpleShakeLimitProvider; import org.jetlinks.community.rule.engine.scene.term.TermColumn; import org.jetlinks.community.terms.TermSpec; +import org.jetlinks.core.utils.Reactors; import org.jetlinks.rule.engine.api.model.RuleModel; import org.jetlinks.rule.engine.api.model.RuleNodeModel; import reactor.core.publisher.Flux; @@ -98,6 +99,11 @@ default Mono> createFilterSpec(E config, List terms, BiCons */ List createDefaultVariable(E config); + default List parseVariable(List terms, + List columns) { + return SceneUtils.parseVariable(terms, columns); + } + /** * 应用规则节点配置 * diff --git a/jetlinks-manager/rule-engine-manager/src/main/java/org/jetlinks/community/rule/engine/scene/SceneUtils.java b/jetlinks-manager/rule-engine-manager/src/main/java/org/jetlinks/community/rule/engine/scene/SceneUtils.java index 1894285e9..100ab5e2e 100644 --- a/jetlinks-manager/rule-engine-manager/src/main/java/org/jetlinks/community/rule/engine/scene/SceneUtils.java +++ b/jetlinks-manager/rule-engine-manager/src/main/java/org/jetlinks/community/rule/engine/scene/SceneUtils.java @@ -2,8 +2,10 @@ import org.apache.commons.collections4.CollectionUtils; import org.hswebframework.ezorm.core.param.Term; +import org.hswebframework.ezorm.rdb.executor.SqlRequest; import org.hswebframework.ezorm.rdb.operator.builder.fragments.NativeSql; import org.jetlinks.community.PropertyMetric; +import org.jetlinks.community.reactorql.function.FunctionSupport; import org.jetlinks.community.reactorql.term.TermType; import org.jetlinks.community.rule.engine.executor.device.DeviceSelectorProviders; import org.jetlinks.community.rule.engine.scene.term.TermColumn; @@ -14,6 +16,7 @@ import reactor.core.publisher.Mono; import java.util.*; +import java.util.function.BiFunction; import java.util.function.Function; import java.util.stream.Collectors; @@ -154,18 +157,15 @@ private static List columnToVariable(String prefixName, return variables; } - public static Flux getSupportTriggers() { - return Flux - .fromIterable(SceneProviders.triggerProviders()) - .map(SceneTriggerProvider::getProvider); + public static Flux> getSupportTriggers() { + return Flux.fromIterable(SceneProviders.triggerProviders()); } - public static Flux getSupportActions() { - return Flux - .fromIterable(SceneProviders.actionProviders()) - .map(SceneActionProvider::getProvider); + public static Flux> getSupportActions() { + return Flux.fromIterable(SceneProviders.actionProviders()); } + public static Flux parseTermColumns(SceneRule ruleMono) { Trigger trigger = ruleMono.getTrigger(); if (trigger != null) { @@ -196,7 +196,9 @@ public static Flux getDeviceSelectors() { .map(SelectorInfo::of); } - public static Term refactorTerm(String tableName, Term term) { + public static Term refactorTerm(String tableName, + Term term, + BiFunction columnRefactor) { if (term.getColumn() == null) { return term; } @@ -219,6 +221,15 @@ else if (value.getSource() == TermValue.Source.metric) { term.getOptions().add(TermType.OPTIONS_NATIVE_SQL); return tableName + "['" + arr[1] + "_metric_" + value.getMetric() + "']"; } + //函数, 如: array_len() , device_prop() + else if (value.getSource() == TermValue.Source.function) { + SqlRequest request = FunctionSupport + .supports + .getNow(value.getFunction()) + .createSql(columnRefactor.apply(tableName, value.getColumn()), value.getArgs()) + .toRequest(); + return NativeSql.of(request.getSql(), request.getParameters()); + } //手动设置值 else { return value.getValue(); @@ -234,28 +245,42 @@ else if (value.getSource() == TermValue.Source.metric) { .collect(Collectors.toList()); } - String column; - // properties.xxx.last的场景 - if (arr.length > 3 && arr[0].equals("properties")) { - column = tableName + "['" + createColumnAlias("properties", term.getColumn(), false) - + "." + String.join(".", Arrays.copyOfRange(arr, 2, arr.length - 1)) + "']"; - } else if (!isDirectTerm(arr[0])) { - column = tableName + "['" + createColumnAlias("properties", term.getColumn(), false) + "']"; - } else { - column = term.getColumn(); - } - if (term.getOptions().contains(TermType.OPTIONS_NATIVE_SQL) && !(val instanceof NativeSql)) { val = NativeSql.of(String.valueOf(val)); } - term.setColumn(column); + term.setColumn(columnRefactor.apply(tableName, term.getColumn())); term.setValue(val); return term; } + public static Term refactorTerm(String tableName, Term term) { + return refactorTerm(tableName, term, SceneUtils::refactorColumn); + } + + private static String refactorColumn(String tableName, String column) { + String[] arr = column.split("[.]"); + // fixme 重构 条件列解析逻辑 + // properties.xxx.last的场景 + if (arr.length > 3 && arr[0].equals("properties")) { + return tableName + "['" + createColumnAlias("properties", column, false) + + "." + String.join(".", Arrays.copyOfRange(arr, 2, arr.length - 1)) + "']"; + } else if (!isDirectTerm(arr[0])) { + return tableName + "['" + createColumnAlias(arr[0], column, false) + "']"; + } else { + // scene.obj1.xx.val1.current => t['scene.obj1_current.val1'] + if (arr.length > 3 && isSceneTerm(column)) { + return tableName + "['" + arr[0] + "." + createColumnAlias(arr[0], column, false) + + "." + String.join(".", Arrays.copyOfRange(arr, 2, arr.length - 1)) + + "']"; + } else { + return tableName + "['" + column + "']"; + } + } + } + private static boolean isDirectTerm(String column) { //直接term,构建Condition输出条件时使用 diff --git a/jetlinks-manager/rule-engine-manager/src/main/java/org/jetlinks/community/rule/engine/scene/Trigger.java b/jetlinks-manager/rule-engine-manager/src/main/java/org/jetlinks/community/rule/engine/scene/Trigger.java index 871535523..17a76d8d8 100644 --- a/jetlinks-manager/rule-engine-manager/src/main/java/org/jetlinks/community/rule/engine/scene/Trigger.java +++ b/jetlinks-manager/rule-engine-manager/src/main/java/org/jetlinks/community/rule/engine/scene/Trigger.java @@ -9,6 +9,8 @@ import org.hswebframework.ezorm.rdb.executor.SqlRequest; import org.hswebframework.ezorm.rdb.operator.builder.fragments.EmptySqlFragments; import org.hswebframework.ezorm.rdb.operator.builder.fragments.SqlFragments; +import org.jetlinks.community.reactorql.term.TermTypeSupport; +import org.jetlinks.community.reactorql.term.TermTypes; import org.jetlinks.community.rule.engine.commons.ShakeLimit; import org.jetlinks.community.rule.engine.scene.internal.triggers.*; import org.jetlinks.community.rule.engine.scene.term.TermColumn; @@ -72,7 +74,12 @@ public List refactorTerm(String tableName, List terms) { List target = new ArrayList<>(terms.size()); for (Term term : terms) { Term copy = term.clone(); - target.add(provider().refactorTerm(tableName, copy)); + TermTypeSupport support = TermTypes.lookupSupport(term.getTermType()).orElse(null); + if (support != null) { + target.add(support.refactorTerm(tableName, copy, provider()::refactorTerm)); + } else { + target.add(provider().refactorTerm(tableName, copy)); + } if (org.apache.commons.collections4.CollectionUtils.isNotEmpty(copy.getTerms())) { copy.setTerms(refactorTerm(tableName, copy.getTerms())); } diff --git a/jetlinks-manager/rule-engine-manager/src/main/java/org/jetlinks/community/rule/engine/scene/internal/actions/AlarmActionProvider.java b/jetlinks-manager/rule-engine-manager/src/main/java/org/jetlinks/community/rule/engine/scene/internal/actions/AlarmActionProvider.java index 4ce4bd463..c300177f1 100644 --- a/jetlinks-manager/rule-engine-manager/src/main/java/org/jetlinks/community/rule/engine/scene/internal/actions/AlarmActionProvider.java +++ b/jetlinks-manager/rule-engine-manager/src/main/java/org/jetlinks/community/rule/engine/scene/internal/actions/AlarmActionProvider.java @@ -2,12 +2,14 @@ import org.hswebframework.web.bean.FastBeanCopier; import org.jetlinks.community.rule.engine.alarm.AlarmTaskExecutorProvider; +import org.jetlinks.community.rule.engine.enums.AlarmMode; import org.jetlinks.community.rule.engine.scene.SceneActionProvider; import org.jetlinks.community.rule.engine.scene.Variable; import org.jetlinks.rule.engine.api.model.RuleNodeModel; import org.springframework.stereotype.Component; import reactor.core.publisher.Flux; +import java.util.Arrays; import java.util.Collections; import java.util.HashMap; import java.util.List; @@ -40,4 +42,9 @@ public void applyRuleNode(AlarmAction config, RuleNodeModel model) { model.setExecutor(AlarmTaskExecutorProvider.executor); model.setConfiguration(FastBeanCopier.copy(config, new HashMap<>())); } + + @Override + public List getMode() { + return Arrays.asList(AlarmMode.trigger.name(), AlarmMode.relieve.name()); + } } diff --git a/jetlinks-manager/rule-engine-manager/src/main/java/org/jetlinks/community/rule/engine/scene/internal/actions/DeviceDataActionProvider.java b/jetlinks-manager/rule-engine-manager/src/main/java/org/jetlinks/community/rule/engine/scene/internal/actions/DeviceDataActionProvider.java new file mode 100644 index 000000000..5bbd59cc5 --- /dev/null +++ b/jetlinks-manager/rule-engine-manager/src/main/java/org/jetlinks/community/rule/engine/scene/internal/actions/DeviceDataActionProvider.java @@ -0,0 +1,99 @@ +package org.jetlinks.community.rule.engine.scene.internal.actions; + +import lombok.AllArgsConstructor; +import lombok.Getter; +import lombok.Setter; +import org.hswebframework.web.bean.FastBeanCopier; +import org.jetlinks.community.rule.engine.executor.device.DeviceDataTaskExecutorProvider; +import org.jetlinks.community.rule.engine.executor.device.DeviceSelectorProviders; +import org.jetlinks.community.rule.engine.executor.device.DeviceSelectorSpec; +import org.jetlinks.community.rule.engine.scene.SceneAction; +import org.jetlinks.community.rule.engine.scene.SceneActionProvider; +import org.jetlinks.community.rule.engine.scene.Variable; +import org.jetlinks.core.metadata.PropertyMetadata; +import org.jetlinks.core.things.ThingsRegistry; +import org.jetlinks.rule.engine.api.model.RuleNodeModel; +import org.springframework.stereotype.Component; +import reactor.core.publisher.Flux; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; + +/** + * + * @author zhangji 2025/1/22 + * @since 2.3 + */ +@Component +@AllArgsConstructor +public class DeviceDataActionProvider implements SceneActionProvider { + public static final String PROVIDER = "device-data"; + + private final ThingsRegistry registry; + + @Override + public String getProvider() { + return PROVIDER; + } + + @Override + public DeviceDataAction newConfig() { + return new DeviceDataAction(); + } + + @Override + public List parseColumns(DeviceDataAction config) { + + return Collections.emptyList(); + } + + @Override + public Flux createVariable(DeviceDataAction config) { + return config + .getSelector() + .getDeviceMetadata(registry, config.getProductId()) + .map(config::createOutputType) + .flatMapIterable(type -> { + List props = type.getProperties(); + List variables = new ArrayList<>(props.size()); + for (PropertyMetadata prop : props) { + variables.add( + SceneAction + .toVariable(prop.getId(), + prop.getName(), + prop.getValueType(), + "message.scene.action.device-data." + prop.getId(), + "设备[%s]信息", + null) + ); + + } + return variables; + }); + } + + @Override + public void applyRuleNode(DeviceDataAction config, RuleNodeModel model) { + + model.setExecutor(DeviceDataTaskExecutorProvider.ID); + if (DeviceSelectorProviders.isFixed(config.getSelector())) { + config.setSelector(FastBeanCopier.copy(config.getSelector(), new DeviceSelectorSpec())); + } else { + config.setSelector( + DeviceSelectorProviders.composite( + //先选择产品下的设备 + DeviceSelectorProviders.product(config.getProductId()), + FastBeanCopier.copy(config.getSelector(), new DeviceSelectorSpec()) + )); + } + + model.setConfiguration(config.toMap()); + } + + @Getter + @Setter + public static class DeviceDataAction extends DeviceDataTaskExecutorProvider.Config { + private String productId; + } +} diff --git a/jetlinks-manager/rule-engine-manager/src/main/java/org/jetlinks/community/rule/engine/scene/internal/triggers/DeviceTrigger.java b/jetlinks-manager/rule-engine-manager/src/main/java/org/jetlinks/community/rule/engine/scene/internal/triggers/DeviceTrigger.java index d8fd71086..023d10058 100644 --- a/jetlinks-manager/rule-engine-manager/src/main/java/org/jetlinks/community/rule/engine/scene/internal/triggers/DeviceTrigger.java +++ b/jetlinks-manager/rule-engine-manager/src/main/java/org/jetlinks/community/rule/engine/scene/internal/triggers/DeviceTrigger.java @@ -242,7 +242,11 @@ protected SqlFragments createTermFragments(DeviceTrigger trigger, Term term) { .lookupSupport(termType) .orElseThrow(() -> new UnsupportedOperationException("unsupported termType " + termType)); - Term copy = refactorTermValue(DEFAULT_FILTER_TABLE, term.clone()); + Term copy = support + .refactorTerm( + DEFAULT_FILTER_TABLE, term.clone(), + DeviceTrigger::refactorTermValue + ); return support.createSql(copy.getColumn(), copy.getValue(), copy); } diff --git a/jetlinks-manager/rule-engine-manager/src/main/java/org/jetlinks/community/rule/engine/scene/internal/triggers/DeviceTriggerProvider.java b/jetlinks-manager/rule-engine-manager/src/main/java/org/jetlinks/community/rule/engine/scene/internal/triggers/DeviceTriggerProvider.java index 10c24a029..437a0a014 100644 --- a/jetlinks-manager/rule-engine-manager/src/main/java/org/jetlinks/community/rule/engine/scene/internal/triggers/DeviceTriggerProvider.java +++ b/jetlinks-manager/rule-engine-manager/src/main/java/org/jetlinks/community/rule/engine/scene/internal/triggers/DeviceTriggerProvider.java @@ -50,6 +50,11 @@ public SqlFragments createFilter(DeviceTrigger config, List terms) { return config.createFragments(terms); } + @Override + public Term refactorTerm(String mainTableName, Term term) { + return DeviceTrigger.refactorTermValue(mainTableName, term); + } + @Override public List createDefaultVariable(DeviceTrigger config) { return config.createDefaultVariable(); diff --git a/jetlinks-manager/rule-engine-manager/src/main/java/org/jetlinks/community/rule/engine/scene/term/TermColumn.java b/jetlinks-manager/rule-engine-manager/src/main/java/org/jetlinks/community/rule/engine/scene/term/TermColumn.java index 0956ff3ab..72b5f0ac0 100644 --- a/jetlinks-manager/rule-engine-manager/src/main/java/org/jetlinks/community/rule/engine/scene/term/TermColumn.java +++ b/jetlinks-manager/rule-engine-manager/src/main/java/org/jetlinks/community/rule/engine/scene/term/TermColumn.java @@ -8,6 +8,7 @@ import org.hswebframework.ezorm.core.param.Term; import org.hswebframework.web.bean.FastBeanCopier; import org.hswebframework.web.i18n.LocaleUtils; +import org.jetlinks.community.reactorql.function.FunctionInfo; import org.jetlinks.core.metadata.DataType; import org.jetlinks.core.metadata.PropertyMetadata; import org.jetlinks.core.metadata.types.BooleanType; @@ -66,6 +67,9 @@ public class TermColumn { @Schema(description = "支持的条件类型") private List termTypes; + @Schema(description = "支持的函数") + private List functions; + @Schema(description = "支持的指标") private List metrics; diff --git a/jetlinks-manager/rule-engine-manager/src/main/java/org/jetlinks/community/rule/engine/scene/value/TermValue.java b/jetlinks-manager/rule-engine-manager/src/main/java/org/jetlinks/community/rule/engine/scene/value/TermValue.java index 80db72129..5cf105af8 100644 --- a/jetlinks-manager/rule-engine-manager/src/main/java/org/jetlinks/community/rule/engine/scene/value/TermValue.java +++ b/jetlinks-manager/rule-engine-manager/src/main/java/org/jetlinks/community/rule/engine/scene/value/TermValue.java @@ -28,6 +28,15 @@ public class TermValue implements Serializable { @Schema(description = "[source]为[metric]时不能为空") private String metric; + @Schema(description = "[source]为[function]时不能为空") + private String function; + + @Schema(description = "[source]为[function]时有效") + private String column; + + @Schema(description = "[source]为[function]时有效") + private Map args; + public static TermValue manual(Object value) { TermValue termValue = new TermValue(); termValue.setValue(value); @@ -66,9 +75,26 @@ public static List of(Object value) { } public enum Source { + + /** + * 和manual一样, + * 兼容{@link org.jetlinks.community.relation.utils.VariableSource.Source#fixed} + */ + fixed, manual, + metric, variable, - upper + /** + * 和variable一样,兼容{@link org.jetlinks.community.relation.utils.VariableSource.Source#upper} + */ + upper, + + /** + * 函数 + * + * @see org.jetlinks.community.reactorql.function.FunctionSupport + */ + function } } diff --git a/jetlinks-manager/rule-engine-manager/src/main/java/org/jetlinks/community/rule/engine/utils/TermColumnUtils.java b/jetlinks-manager/rule-engine-manager/src/main/java/org/jetlinks/community/rule/engine/utils/TermColumnUtils.java new file mode 100644 index 000000000..3e80f36c2 --- /dev/null +++ b/jetlinks-manager/rule-engine-manager/src/main/java/org/jetlinks/community/rule/engine/utils/TermColumnUtils.java @@ -0,0 +1,178 @@ +package org.jetlinks.community.rule.engine.utils; + +import org.hswebframework.web.i18n.LocaleUtils; +import org.jetlinks.community.reactorql.impl.ComplexExistsFunction; +import org.jetlinks.community.rule.engine.scene.DeviceOperation; +import org.jetlinks.community.rule.engine.scene.SceneUtils; +import org.jetlinks.community.rule.engine.scene.term.TermColumn; +import org.jetlinks.core.metadata.DataType; +import org.jetlinks.core.metadata.PropertyMetadata; +import org.jetlinks.core.metadata.SimplePropertyMetadata; +import org.jetlinks.core.metadata.types.ArrayType; +import org.jetlinks.core.metadata.types.IntType; +import org.jetlinks.core.metadata.types.LongType; +import org.jetlinks.core.metadata.types.ObjectType; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import java.util.function.BiConsumer; +import java.util.stream.Collectors; + +import static org.jetlinks.core.metadata.SimplePropertyMetadata.of; + +/** + * + * @author zhangji 2025/1/22 + * @since 2.3 + */ +public class TermColumnUtils { + + public static List parseArrayChildTermColumns(DataType dataType) { + String prefix = ComplexExistsFunction.COL_ELEMENT; + if (!(dataType instanceof ArrayType)) { + return new ArrayList<>(); + } + ArrayType arrayType = (ArrayType) dataType; + List columns = new ArrayList<>(); + if (arrayType.getElementType() instanceof ObjectType) { + List properties = ((ObjectType) arrayType.getElementType()).getProperties(); + return TermColumnUtils.createTerm( + prefix, + properties, + (property, column) -> column.setChildren(TermColumnUtils.createTermColumn(prefix, property, true)), + LocaleUtils.resolveMessage("message.device_metadata_property", "属性")); + } else { + SimplePropertyMetadata prop = + of("this", + resolveI18n("message.term_element_of_array", + "数组元素"), + (arrayType.getElementType() instanceof ArrayType) ? IntType.GLOBAL : arrayType.getElementType()); + columns.addAll(TermColumnUtils.createTermColumn(prefix, prop, false)); + } + + return columns; + } + + + public static List createTermColumn(String prefix, + PropertyMetadata property, + boolean last, + DeviceOperation.PropertyValueType... valueTypes) { + //对象类型嵌套 + if (property.getValueType() instanceof ObjectType) { + ObjectType objType = ((ObjectType) property.getValueType()); + if (objType.getProperties().isEmpty()) { + String _prefix = prefix == null ? property.getId() : prefix + "." + property.getId(); + return createTermColumn0(_prefix, property, last, valueTypes); + } + return createTerm( + objType.getProperties(), + (prop, column) -> { + String _prefix = prefix == null ? property.getId() : prefix + "." + property.getId(); + if (!last && !(prop.getValueType() instanceof ObjectType)) { + TermColumn term = createTermColumn(_prefix, prop, false, valueTypes).get(0); + column.setColumn(term.getColumn()); + column.setName(term.getName()); + column.setOptions(term.getOptions()); + column.withOthers(term.getOthers()); + } else { + column.setColumn(SceneUtils.appendColumn(_prefix, prop.getId())); + column.setChildren(createTermColumn(_prefix, prop, last, valueTypes)); + } + }); + } else if (property.getValueType() instanceof ArrayType) { + PropertyMetadata sizeMetadata = of("size", + resolveI18n("message.term_size_of_array", "长度"), + new LongType()); + PropertyMetadata valueMetadata = of("this", + resolveI18n("message.term_origin_of_array", "原始值"), + property.getValueType()); + String _prefix = prefix == null ? property.getId() : prefix + "." + property.getId(); + + TermColumn size = TermColumn + .of(SceneUtils.appendColumn(_prefix, sizeMetadata.getId()), + sizeMetadata.getName(), sizeMetadata.getValueType()) + .withMetadataTrue(); + + size.setChildren(createTermColumn0(_prefix, sizeMetadata, last, valueTypes)); + + TermColumn valueThis = + TermColumn + .of(SceneUtils.appendColumn(_prefix, valueMetadata.getId()), + valueMetadata.getName(), valueMetadata.getValueType()) + .withMetadataTrue(); + valueThis.setChildren(createTermColumn0(_prefix, valueMetadata, last, valueTypes)); + + + return Arrays.asList(size, valueThis); + + + } else { + return createTermColumn0(prefix, property, last, valueTypes); + } + } + + public static List createTerm(List metadataList, + BiConsumer consumer, + String... description) { + List columns = new ArrayList<>(metadataList.size()); + for (PropertyMetadata metadata : metadataList) { + TermColumn column = TermColumn.of(metadata); + column.setDescription(String.join("", description)); + consumer.accept(metadata, column); + columns.add(column.withMetadataTrue()); + } + return columns; + } + + public static List createTerm(String prefix, + List metadataList, + BiConsumer consumer, + String... description) { + List columns = new ArrayList<>(metadataList.size()); + for (PropertyMetadata metadata : metadataList) { + TermColumn column = TermColumn. + of(SceneUtils.appendColumn(prefix, metadata.getId()), + metadata.getName(), + metadata.getValueType()); + column.setDescription(String.join("", description)); + consumer.accept(metadata, column); + columns.add(column.withMetadataTrue()); + } + return columns; + } + + private static List createTermColumn0(String prefix, + PropertyMetadata property, + boolean last, + DeviceOperation.PropertyValueType... valueTypes) { + if (!last) { + return Collections.singletonList( + TermColumn.of(SceneUtils.appendColumn(prefix, property.getId()), + property.getName(), property.getValueType()) + .withMetrics(property) + .withMetadataTrue() + ); + } + return Arrays + .stream(valueTypes) + .map(type -> TermColumn + .of(SceneUtils + .appendColumn(prefix, + property.getId(), + type.name()), + type.getKey(), + null, + type.getDataType() == null ? property.getValueType() : type.getDataType()) + .withMetrics(property) + .withMetadataTrue() + ) + .collect(Collectors.toList()); + } + + private static String resolveI18n(String key, String name) { + return LocaleUtils.resolveMessage(key, name); + } +} diff --git a/jetlinks-manager/rule-engine-manager/src/main/java/org/jetlinks/community/rule/engine/web/AlarmConfigController.java b/jetlinks-manager/rule-engine-manager/src/main/java/org/jetlinks/community/rule/engine/web/AlarmConfigController.java index b0c6fc6f8..ee6743c18 100755 --- a/jetlinks-manager/rule-engine-manager/src/main/java/org/jetlinks/community/rule/engine/web/AlarmConfigController.java +++ b/jetlinks-manager/rule-engine-manager/src/main/java/org/jetlinks/community/rule/engine/web/AlarmConfigController.java @@ -17,6 +17,8 @@ import org.jetlinks.community.rule.engine.entity.AlarmConfigDetail; import org.jetlinks.community.rule.engine.entity.AlarmConfigEntity; import org.jetlinks.community.rule.engine.entity.AlarmLevelEntity; +import org.jetlinks.community.rule.engine.scene.SceneUtils; +import org.jetlinks.community.rule.engine.scene.SceneTriggerProvider; import org.jetlinks.community.rule.engine.service.AlarmConfigService; import org.jetlinks.community.rule.engine.service.AlarmLevelService; import org.jetlinks.community.rule.engine.web.response.AlarmTargetTypeInfo; @@ -55,12 +57,21 @@ public Mono disable(@PathVariable String id) { @GetMapping("/target-type/supports") @Operation(summary = "获取支持的告警目标类型") public Flux getTargetTypeSupports() { + Flux triggerCache = SceneUtils + .getSupportTriggers() + .map(SceneTriggerProvider::getProvider) + .cache(); return Flux .fromIterable(AlarmTargetSupplier .get() .getAll() .values()) - .map(AlarmTargetTypeInfo::of); + .flatMap(alarmTarget -> triggerCache + .filter(alarmTarget::isSupported) + .collectList() + .map(supportTriggers -> AlarmTargetTypeInfo + .of(alarmTarget) + .with(supportTriggers))); } diff --git a/jetlinks-manager/rule-engine-manager/src/main/java/org/jetlinks/community/rule/engine/web/AlarmHistoryController.java b/jetlinks-manager/rule-engine-manager/src/main/java/org/jetlinks/community/rule/engine/web/AlarmHistoryController.java index c633c13b9..9118bf170 100755 --- a/jetlinks-manager/rule-engine-manager/src/main/java/org/jetlinks/community/rule/engine/web/AlarmHistoryController.java +++ b/jetlinks-manager/rule-engine-manager/src/main/java/org/jetlinks/community/rule/engine/web/AlarmHistoryController.java @@ -47,4 +47,30 @@ public Mono> queryHandleHistoryPager( .flatMap(alarmHistoryService::queryPager); } + @PostMapping("/alarm-record/{recordId}/_query") + @Operation(summary = "按告警记录查询告警历史") + @QueryAction + public Mono> queryHistoryPager( + @PathVariable @Parameter(description = "告警记录ID") String recordId, + @RequestBody Mono query) { + return query + .map(q -> q + .toNestQuery() + .and(AlarmHistoryInfo::getAlarmRecordId, recordId) + .getParam()) + .flatMap(alarmHistoryService::queryPager); + } + + @PostMapping("/{dimensionType}/{alarmConfigId}/_query") + @Operation(summary = "按维度查询告警历史") + @QueryAction + public Mono> queryHandleHistoryPagerByDimensionType(@PathVariable @Parameter(description = "告警配置ID") String alarmConfigId, + @PathVariable @Parameter(description = "告警维度") String dimensionType, + @RequestBody Mono query) { + return query + .doOnNext(queryParamEntity -> queryParamEntity + .toNestQuery(q -> q.and(AlarmHistoryInfo::getAlarmConfigId, alarmConfigId))) + .flatMap(alarmHistoryService::queryPager); + } + } diff --git a/jetlinks-manager/rule-engine-manager/src/main/java/org/jetlinks/community/rule/engine/web/AlarmRecordController.java b/jetlinks-manager/rule-engine-manager/src/main/java/org/jetlinks/community/rule/engine/web/AlarmRecordController.java index e46215361..4cfa45577 100755 --- a/jetlinks-manager/rule-engine-manager/src/main/java/org/jetlinks/community/rule/engine/web/AlarmRecordController.java +++ b/jetlinks-manager/rule-engine-manager/src/main/java/org/jetlinks/community/rule/engine/web/AlarmRecordController.java @@ -3,6 +3,7 @@ import io.swagger.v3.oas.annotations.Operation; import io.swagger.v3.oas.annotations.tags.Tag; import lombok.AllArgsConstructor; +import org.hswebframework.ezorm.core.param.TermType; import org.hswebframework.web.api.crud.entity.PagerResult; import org.hswebframework.web.api.crud.entity.QueryParamEntity; import org.hswebframework.web.authorization.annotation.Authorize; @@ -11,16 +12,14 @@ import org.hswebframework.web.authorization.annotation.SaveAction; import org.hswebframework.web.crud.service.ReactiveCrudService; import org.hswebframework.web.crud.web.reactive.ReactiveServiceQueryController; +import org.hswebframework.web.exception.NotFoundException; import org.jetlinks.community.rule.engine.alarm.AlarmHandleInfo; import org.jetlinks.community.rule.engine.entity.AlarmHandleHistoryEntity; import org.jetlinks.community.rule.engine.entity.AlarmRecordEntity; import org.jetlinks.community.rule.engine.service.AlarmConfigService; import org.jetlinks.community.rule.engine.service.AlarmHandleHistoryService; import org.jetlinks.community.rule.engine.service.AlarmRecordService; -import org.springframework.web.bind.annotation.PostMapping; -import org.springframework.web.bind.annotation.RequestBody; -import org.springframework.web.bind.annotation.RequestMapping; -import org.springframework.web.bind.annotation.RestController; +import org.springframework.web.bind.annotation.*; import reactor.core.publisher.Mono; @RestController @@ -42,6 +41,17 @@ public ReactiveCrudService getService() { return recordService; } + @PostMapping("/{dimensionType}/_query") + @Operation(summary = "按不同维度查询告警记录") + @QueryAction + public Mono> queryPagerByDimensionType(@PathVariable String dimensionType, + @RequestBody Mono query) { + return query + .doOnNext(queryParamEntity -> queryParamEntity + .toNestQuery(q -> q.and(AlarmRecordEntity::getTargetType, TermType.eq, dimensionType))) + .flatMap(this::queryPager1); + } + @PostMapping("/_handle") @Operation(summary = "处理告警") @SaveAction @@ -56,4 +66,51 @@ public Mono handleAlarm(@RequestBody Mono handleInfo) { public Mono> queryHandleHistoryPager(@RequestBody Mono query) { return query.flatMap(handleHistoryService::queryPager); } + + @PostMapping("/{id}/handle-history/_query") + @Operation(summary = "按告警记录查询告警处理历史") + @QueryAction + public Mono> queryHandleHistoryPager( + @PathVariable String id, + @RequestBody Mono query) { + return query + .doOnNext(queryParamEntity -> queryParamEntity + .toNestQuery(q -> q.and(AlarmHandleHistoryEntity::getAlarmRecordId, TermType.eq, id))) + .flatMap(handleHistoryService::queryPager); + } + + @PostMapping("/{dimensionType}/_handle") + @Operation(summary = "按维度处理告警") + @SaveAction + @Deprecated + public Mono handleAlarm(@PathVariable String dimensionType, + @RequestBody Mono handleInfo) { + return handleAlarm(handleInfo); + } + + @PostMapping("/handle-history/{dimensionType}/{recordId}/_query") + @Operation(summary = "根据维度查询告警处理历史") + @QueryAction + public Mono> queryHandleHistoryPager(@PathVariable String dimensionType, + @PathVariable String recordId, + @RequestBody Mono query) { + return recordService + .createQuery() + .where(AlarmRecordEntity::getId, recordId) + .fetchOne() + .switchIfEmpty(Mono.error(() -> new NotFoundException.NoStackTrace("error.alarm_record_not_exist", 500, recordId))) + .flatMap(record -> query.flatMap(handleHistoryService::queryPager)); + } + + private Mono> queryPager1(QueryParamEntity query) { + if (query.getTotal() != null) { + return getService() + .createQuery() + .setParam(query.rePaging(query.getTotal())) + .fetch() + .collectList() + .map(list -> PagerResult.of(query.getTotal(), list, query)); + } + return getService().queryPager(query); + } } diff --git a/jetlinks-manager/rule-engine-manager/src/main/java/org/jetlinks/community/rule/engine/web/SceneController.java b/jetlinks-manager/rule-engine-manager/src/main/java/org/jetlinks/community/rule/engine/web/SceneController.java index 5c3000e95..338e1c4c1 100644 --- a/jetlinks-manager/rule-engine-manager/src/main/java/org/jetlinks/community/rule/engine/web/SceneController.java +++ b/jetlinks-manager/rule-engine-manager/src/main/java/org/jetlinks/community/rule/engine/web/SceneController.java @@ -13,14 +13,20 @@ import org.hswebframework.web.authorization.annotation.SaveAction; import org.hswebframework.web.crud.web.reactive.ReactiveServiceQueryController; import org.hswebframework.web.i18n.LocaleUtils; +import org.jetlinks.community.reactorql.aggregation.AggregationSupport; import org.jetlinks.community.rule.engine.service.SceneService; +import org.jetlinks.community.rule.engine.utils.TermColumnUtils; import org.jetlinks.community.rule.engine.web.request.SceneExecuteRequest; +import org.jetlinks.community.rule.engine.web.response.SceneActionInfo; +import org.jetlinks.community.rule.engine.web.response.SceneAggregationInfo; +import org.jetlinks.community.rule.engine.web.response.SceneTriggerInfo; import org.jetlinks.core.device.DeviceRegistry; import org.jetlinks.community.rule.engine.entity.SceneEntity; import org.jetlinks.community.rule.engine.executor.device.DeviceSelectorProvider; import org.jetlinks.community.rule.engine.executor.device.DeviceSelectorProviders; import org.jetlinks.community.rule.engine.scene.*; import org.jetlinks.community.rule.engine.scene.term.TermColumn; +import org.jetlinks.core.metadata.SimplePropertyMetadata; import org.springframework.web.bind.annotation.*; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; @@ -110,6 +116,32 @@ public Mono removeScene(@PathVariable String id) { .then(); } + @GetMapping("/trigger/supports") + @Operation(summary = "获取支持的触发器类型") + public Flux getSupportTriggers() { + return SceneUtils + .getSupportTriggers() + .map(SceneTriggerInfo::of); + } + + @GetMapping("/action/supports") + @Operation(summary = "获取支持的动作类型") + public Flux getSupportActions() { + return SceneUtils + .getSupportActions() + .flatMap(provider -> SceneActionInfo.of(provider)); + } + + @GetMapping("/aggregation/supports") + @Operation(summary = "获取支持的聚合函数") + public Flux getSupportAggregations() { + return LocaleUtils + .currentReactive() + .flatMapMany(locale -> Flux + .fromIterable(AggregationSupport.supports.getAll()) + .map(aggregation -> SceneAggregationInfo.of(aggregation, locale))); + } + @PostMapping("/parse-term-column") @Operation(summary = "根据触发器解析出支持的条件列") @QueryAction @@ -124,6 +156,15 @@ public Flux parseTermColumns(@RequestBody Mono ruleMono) }); } + @PostMapping("/parse-array-child-term-column") + @Operation(summary = "解析数组需要的子元素支持的条件列") + @QueryAction + public Flux parseArrayChildTermColumns(@RequestBody Mono metadataMono) { + return metadataMono + .flatMapMany(metadata -> Flux + .fromIterable(TermColumnUtils.parseArrayChildTermColumns(metadata.getValueType()))); + } + @PostMapping("/parse-variables") @Operation(summary = "解析规则中输出的变量") @QueryAction diff --git a/jetlinks-manager/rule-engine-manager/src/main/java/org/jetlinks/community/rule/engine/web/request/ArrayChildTermColumnRequest.java b/jetlinks-manager/rule-engine-manager/src/main/java/org/jetlinks/community/rule/engine/web/request/ArrayChildTermColumnRequest.java new file mode 100644 index 000000000..dc115de2d --- /dev/null +++ b/jetlinks-manager/rule-engine-manager/src/main/java/org/jetlinks/community/rule/engine/web/request/ArrayChildTermColumnRequest.java @@ -0,0 +1,24 @@ +package org.jetlinks.community.rule.engine.web.request; + +import io.swagger.v3.oas.annotations.media.Schema; +import lombok.Getter; +import lombok.Setter; +import org.jetlinks.core.metadata.SimplePropertyMetadata; + +/** + * + * @author zhangji 2025/1/22 + * @since 2.3 + */ +@Getter +@Setter +public class ArrayChildTermColumnRequest { + + @Schema(description = "条件前缀") + private String prefix; + + @Schema(description = "数组属性") + private SimplePropertyMetadata propertyMetadata; + + +} diff --git a/jetlinks-manager/rule-engine-manager/src/main/java/org/jetlinks/community/rule/engine/web/response/AlarmTargetTypeInfo.java b/jetlinks-manager/rule-engine-manager/src/main/java/org/jetlinks/community/rule/engine/web/response/AlarmTargetTypeInfo.java index f47c62940..51d1eccf4 100644 --- a/jetlinks-manager/rule-engine-manager/src/main/java/org/jetlinks/community/rule/engine/web/response/AlarmTargetTypeInfo.java +++ b/jetlinks-manager/rule-engine-manager/src/main/java/org/jetlinks/community/rule/engine/web/response/AlarmTargetTypeInfo.java @@ -2,8 +2,11 @@ import lombok.Getter; import lombok.Setter; +import org.hswebframework.web.i18n.LocaleUtils; import org.jetlinks.community.rule.engine.alarm.AlarmTarget; +import java.util.List; + /** * @author bestfeng */ @@ -15,6 +18,7 @@ public class AlarmTargetTypeInfo { private String name; + private List supportTriggers; public static AlarmTargetTypeInfo of(AlarmTarget type) { @@ -26,4 +30,13 @@ public static AlarmTargetTypeInfo of(AlarmTarget type) { return info; } + + public AlarmTargetTypeInfo with(List supportTriggers) { + this.supportTriggers = supportTriggers; + return this; + } + + public String getName() { + return LocaleUtils.resolveMessage("message.rule_engine_alarm_" + id, name); + } } diff --git a/jetlinks-manager/rule-engine-manager/src/main/java/org/jetlinks/community/rule/engine/web/response/SceneActionInfo.java b/jetlinks-manager/rule-engine-manager/src/main/java/org/jetlinks/community/rule/engine/web/response/SceneActionInfo.java new file mode 100644 index 000000000..431b97cde --- /dev/null +++ b/jetlinks-manager/rule-engine-manager/src/main/java/org/jetlinks/community/rule/engine/web/response/SceneActionInfo.java @@ -0,0 +1,55 @@ +package org.jetlinks.community.rule.engine.web.response; + +import io.swagger.v3.oas.annotations.media.Schema; +import lombok.Getter; +import lombok.Setter; +import org.hswebframework.web.i18n.LocaleUtils; +import org.jetlinks.community.rule.engine.scene.SceneActionProvider; +import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; + +import java.util.function.Function; + +/** + * 执行动作类型. + * + * @author zhangji 2025/1/22 + * @since 2.3 + */ +@Getter +@Setter +public class SceneActionInfo { + + @Schema(description = "类型") + private String provider; + + @Schema(description = "名称") + private String name; + + @Schema(description = "说明") + private String description; + + public String getName() { + return LocaleUtils.resolveMessage("message.scene_action_name_" + provider, name); + } + + public String getDescription() { + return LocaleUtils.resolveMessage("message.scene_action_desc_" + provider, description); + } + + public static Flux of(SceneActionProvider actionProvider) { + return Mono + .justOrEmpty(actionProvider.getMode()) + .flatMapIterable(Function.identity()) + .map(SceneActionInfo::of) + .defaultIfEmpty(SceneActionInfo.of(actionProvider.getProvider())); + } + + public static SceneActionInfo of(String provider) { + SceneActionInfo actionInfo = new SceneActionInfo(); + actionInfo.setProvider(provider); + actionInfo.setName(provider); + return actionInfo; + } + +} \ No newline at end of file diff --git a/jetlinks-manager/rule-engine-manager/src/main/java/org/jetlinks/community/rule/engine/web/response/SceneAggregationInfo.java b/jetlinks-manager/rule-engine-manager/src/main/java/org/jetlinks/community/rule/engine/web/response/SceneAggregationInfo.java new file mode 100644 index 000000000..0ab05e0d8 --- /dev/null +++ b/jetlinks-manager/rule-engine-manager/src/main/java/org/jetlinks/community/rule/engine/web/response/SceneAggregationInfo.java @@ -0,0 +1,38 @@ +package org.jetlinks.community.rule.engine.web.response; + +import io.swagger.v3.oas.annotations.media.Schema; +import lombok.Getter; +import lombok.Setter; +import org.hswebframework.web.i18n.LocaleUtils; +import org.jetlinks.community.reactorql.aggregation.AggregationSupport; + +import java.util.Locale; + +/** + * 聚合函数类型. + * + * @author zhangji 2025/1/22 + * @since 2.3 + */ +@Getter +@Setter +public class SceneAggregationInfo { + + @Schema(description = "函数") + private String id; + + @Schema(description = "名称") + private String name; + + public static SceneAggregationInfo of(AggregationSupport support, Locale locale) { + String id = support.getId(); + SceneAggregationInfo aggregationInfo = new SceneAggregationInfo(); + aggregationInfo.setId(id); + aggregationInfo.setName(LocaleUtils + .resolveMessage("message.scene_aggregation_name_" + id, + locale, + support.getName())); + return aggregationInfo; + } + +} diff --git a/jetlinks-manager/rule-engine-manager/src/main/java/org/jetlinks/community/rule/engine/web/response/SceneTriggerInfo.java b/jetlinks-manager/rule-engine-manager/src/main/java/org/jetlinks/community/rule/engine/web/response/SceneTriggerInfo.java new file mode 100644 index 000000000..c54aaef8d --- /dev/null +++ b/jetlinks-manager/rule-engine-manager/src/main/java/org/jetlinks/community/rule/engine/web/response/SceneTriggerInfo.java @@ -0,0 +1,44 @@ +package org.jetlinks.community.rule.engine.web.response; + +import io.swagger.v3.oas.annotations.media.Schema; +import lombok.Getter; +import lombok.Setter; +import org.hswebframework.web.i18n.LocaleUtils; +import org.jetlinks.community.rule.engine.scene.SceneTriggerProvider; + +/** + * 触发器类型. + * + * @author zhangji 2025/1/22 + * @since 2.3 + */ +@Getter +@Setter +public class SceneTriggerInfo { + + @Schema(description = "类型") + private String provider; + + @Schema(description = "名称") + private String name; + + @Schema(description = "说明") + private String description; + + public String getName() { + return LocaleUtils.resolveMessage("message.scene_trigger_name_" + provider, name); + } + + public String getDescription() { + return LocaleUtils.resolveMessage("message.scene_trigger_desc_" + provider, description); + } + + public static SceneTriggerInfo of(SceneTriggerProvider triggerProvider) { + String provider = triggerProvider.getProvider(); + SceneTriggerInfo triggerInfo = new SceneTriggerInfo(); + triggerInfo.setProvider(provider); + triggerInfo.setName(triggerProvider.getName()); + return triggerInfo; + } + +} diff --git a/jetlinks-manager/rule-engine-manager/src/main/resources/META-INF/spring/org.springframework.boot.autoconfigure.AutoConfiguration.imports b/jetlinks-manager/rule-engine-manager/src/main/resources/META-INF/spring/org.springframework.boot.autoconfigure.AutoConfiguration.imports index 7aa793a3a..f7f7a176d 100644 --- a/jetlinks-manager/rule-engine-manager/src/main/resources/META-INF/spring/org.springframework.boot.autoconfigure.AutoConfiguration.imports +++ b/jetlinks-manager/rule-engine-manager/src/main/resources/META-INF/spring/org.springframework.boot.autoconfigure.AutoConfiguration.imports @@ -1 +1,2 @@ -org.jetlinks.community.rule.engine.configuration.RuleEngineManagerConfiguration \ No newline at end of file +org.jetlinks.community.rule.engine.configuration.RuleEngineManagerConfiguration +org.jetlinks.community.rule.engine.configuration.AlarmTargetConfiguration \ No newline at end of file diff --git a/jetlinks-manager/rule-engine-manager/src/main/resources/i18n/rule-engine-manager/messages_en.properties b/jetlinks-manager/rule-engine-manager/src/main/resources/i18n/rule-engine-manager/messages_en.properties index 315b445c1..516aa1558 100644 --- a/jetlinks-manager/rule-engine-manager/src/main/resources/i18n/rule-engine-manager/messages_en.properties +++ b/jetlinks-manager/rule-engine-manager/src/main/resources/i18n/rule-engine-manager/messages_en.properties @@ -94,7 +94,69 @@ message.property_value_type_recent_nest_desc=When the current value is empty, th message.property_value_type_last=Last value message.property_value_type_last_time=Last time message.property_value_type_last_time_desc=Last reported data time +message.property_value_type_last_time_nest_desc=Data time in last [{0}] message.property_value_type_last_desc=Last reported data value message.property_value_type_last_nest_desc=Data value in last [{0}] - -message.scene_term_column_full_name={1} of {0} \ No newline at end of file +message.scene_triggered_relieve_alarm=scene triggered relieve alarm + +message.scene_term_column_full_name={1} of {0} + +error.alarm_record_not_exist = The alarm record [{0}] not exist + +message.scene_trigger_type=Scene trigger type +message.term_type_scene_manual_desc=When the system receives a manual trigger command, it triggers the scene +message.term_type_scene_manual_actual_desc=Manual trigger +message.term_type_scene_timer_actual_desc=Timer trigger +message.term_size_of_array=array size +message.term_origin_of_array=array origin +message.term_element_of_array=array element + +message.rule_engine_alarm_collector=collector +message.rule_engine_alarm_aiModel=aiModel +message.rule_engine_alarm_aiTask=aiTask +message.rule_engine_alarm_device=device +message.rule_engine_alarm_organization=org +message.rule_engine_alarm_product=product +message.rule_engine_alarm_scene=scene +message.rule_engine_alarm_subscriber_provider_alarm-other=scene alarm +message.rule_engine_alarm_subscriber_provider_alarm-product=product alarm +message.rule_engine_alarm_subscriber_provider_alarm-device=device alarm +message.rule_engine_alarm_subscriber_provider_alarm-org=org alarm +message.rule_engine_alarm_subscriber_provider_alarm=alarm + +#scene-trigger +message.scene_trigger_name_manual=manual trigger +message.scene_trigger_name_device=device trigger +message.scene_trigger_name_timer=timer trigger +message.scene_trigger_name_collector=collector trigger +message.scene_trigger_desc_manual=Suitable for third-party platforms issuing instructions to IoT platforms to control devices +message.scene_trigger_desc_device=Suitable for executing specified actions when device data or behavior meets triggering conditions +message.scene_trigger_desc_timer=Suitable for regularly executing fixed tasks +message.scene_trigger_desc_collector=Suitable for executing specified actions when the collector point meets the triggering conditions +#scene-action +message.scene_action_name_device=device output +message.scene_action_name_device-data=device data +message.scene_action_name_notify=message notification +message.scene_action_name_delay=delay execution +message.scene_action_name_trigger=trigger alarm +message.scene_action_name_relieve=relieve alarm +message.scene_action_name_collector=collector output +message.scene_action_desc_device=Configure device invocation function, read attributes, set attribute rules +message.scene_action_desc_device-data=Get basic information, attributes, tags, events, etc. of the device +message.scene_action_desc_notify=Configure notifications to be sent to designated users via email, DingTalk, WeChat, SMS, etc +message.scene_action_desc_delay=Wait for a period of time before executing subsequent actions +message.scene_action_desc_trigger=Configure trigger alarm rules, which need to be used in conjunction with "alarm configuration" +message.scene_action_desc_relieve=Configure relieve alarm rules, which need to be used in conjunction with "Alarm Configuration" +message.scene_action_desc_collector=Configure reading points and set point rules +#aggregation +message.scene_aggregation_name_COUNT=COUNT +message.scene_aggregation_name_DISTINCT_COUNT=DISTINCT_COUNT +message.scene_aggregation_name_MIN=MIN +message.scene_aggregation_name_MAX=MAX +message.scene_aggregation_name_AVG=AVG +message.scene_aggregation_name_SUM=SUM +message.scene_aggregation_name_FIRST=FIRST +message.scene_aggregation_name_LAST=LAST +message.scene_aggregation_name_MEDIAN=MEDIAN +message.scene_aggregation_name_SPREAD=SPREAD +message.scene_aggregation_name_STDDEV=STDDEV \ No newline at end of file diff --git a/jetlinks-manager/rule-engine-manager/src/main/resources/i18n/rule-engine-manager/messages_zh.properties b/jetlinks-manager/rule-engine-manager/src/main/resources/i18n/rule-engine-manager/messages_zh.properties index eae45086a..822f828b5 100644 --- a/jetlinks-manager/rule-engine-manager/src/main/resources/i18n/rule-engine-manager/messages_zh.properties +++ b/jetlinks-manager/rule-engine-manager/src/main/resources/i18n/rule-engine-manager/messages_zh.properties @@ -27,11 +27,12 @@ error.scene_rule_trigger_device_operation_event_id_cannot_be_null=\u8BBE\u5907\u error.scene_rule_trigger_device_operation_read_property_cannot_be_empty=\u5C5E\u6027\u8BFB\u53D6\u4E0D\u80FD\u4E3A\u7A7A error.scene_rule_trigger_device_operation_write_property_cannot_be_empty=\u4FEE\u6539\u5C5E\u6027\u4E0D\u80FD\u4E3A\u7A7A error.scene_rule_trigger_device_operation_function_id_cannot_be_null=\u529F\u80FDid\u4E0D\u80FD\u4E3A\u7A7A -error.scene_rule_trigger_device_operation_function_parameter_cannot_be_empty=\u529F\u80FD\u53C2\u6570\u4E0D\u80FD\u4E3A\u7A7A error.scene_rule_actions_notify_type_cannot_be_empty=\u901A\u77E5\u7C7B\u578B\u4E0D\u80FD\u4E3A\u7A7A error.scene_rule_actions_notify_id_cannot_be_empty=\u901A\u77E5\u7C7B\u578B\u4E0D\u80FD\u4E3A\u7A7A error.scene_rule_actions_notify_template_cannot_be_blank=\u901A\u77E5\u6A21\u677F\u4E0D\u80FD\u4E3A\u7A7A error.scene_rule_actions_notify_variables_cannot_be_blank=\u901A\u77E5\u53D8\u91CF\u4E0D\u80FD\u4E3A\u7A7A +error.alarm_record_not_fount=\u627E\u4E0D\u5230\u3010{0}\u3011\u62A5\u8B66\u8BB0\u5F55 +error.the_alarm_record_has_been_processed=\u544A\u8B66\u8BB0\u5F55\u5DF2\u88AB\u5904\u7406 #entity-package error.not_set_alarm_rule=\u672A\u8BBE\u7F6E\u544A\u8B66\u89C4\u5219 @@ -99,7 +100,68 @@ message.property_value_type_recent_nest_desc=\u5F53\u524D\u503C\u4E3A\u7A7A\u65F message.property_value_type_last=\u4E0A\u4E00\u503C message.property_value_type_last_time=\u4E0A\u4E00\u6B21\u4E0A\u62A5\u65F6\u95F4 message.property_value_type_last_time_desc=\u4E0A\u4E00\u6B21\u4E0A\u62A5\u6570\u636E\u7684\u65F6\u95F4 +message.property_value_type_last_time_nest_desc=\u4e0a\u4e00\u6b21\u3010{0}\u3011\u4e2d\u4e0a\u62a5\u6570\u636e\u7684\u65f6\u95f4 message.property_value_type_last_desc=\u4E0A\u4E00\u6B21\u4E0A\u62A5\u7684\u6570\u636E\u503C message.property_value_type_last_nest_desc=\u4E0A\u4E00\u6B21\u3010{0}\u3011\u4E2D\u7684\u6570\u636E\u503C +message.scene_triggered_relieve_alarm=\u573A\u666F\u89E6\u53D1\u89E3\u9664\u544A\u8B66 message.scene_term_column_full_name={0}/{1} + +error.alarm_record_not_exist = \u544A\u8B66\u8BB0\u5F55[{0}]\u4E0D\u5B58\u5728 + +message.scene_trigger_type=\u573A\u666F\u89E6\u53D1\u7C7B\u578B +message.term_type_scene_manual_trigger_desc=\u7CFB\u7EDF\u5728\u63A5\u6536\u5230\u624B\u52A8\u89E6\u53D1\u6307\u4EE4\u65F6\uFF0C\u89E6\u53D1\u573A\u666F +message.term_type_scene_manual_actual_desc=\u624B\u52A8\u89E6\u53D1\u544A\u8B66 +message.term_type_scene_timer_actual_desc=\u5B9A\u65F6\u89E6\u53D1\u544A\u8B66 +message.term_size_of_array=\u6570\u7EC4\u957F\u5EA6 +message.term_origin_of_array=\u539F\u59CB\u503C +message.term_element_of_array=\u6570\u503C\u5143\u7D20 + +message.rule_engine_alarm_collector=\u91C7\u96C6\u5668 +message.rule_engine_alarm_aiModel=AI\u6A21\u578B +message.rule_engine_alarm_aiTask=AI\u4EFB\u52A1 +message.rule_engine_alarm_device=\u8BBE\u5907 +message.rule_engine_alarm_organization=\u7EC4\u7EC7 +message.rule_engine_alarm_product=\u4EA7\u54C1 +message.rule_engine_alarm_scene=\u573A\u666F +message.rule_engine_alarm_subscriber_provider_alarm-other=\u573A\u666F\u544A\u8B66 +message.rule_engine_alarm_subscriber_provider_alarm-product=\u4EA7\u54C1\u544A\u8B66 +message.rule_engine_alarm_subscriber_provider_alarm-device=\u8BBE\u5907\u544A\u8B66 +message.rule_engine_alarm_subscriber_provider_alarm-org=\u7EC4\u7EC7\u544A\u8B66 +message.rule_engine_alarm_subscriber_provider_alarm=\u544A\u8B66 +#scene-trigger +message.scene_trigger_name_manual=\u624b\u52a8\u89e6\u53d1 +message.scene_trigger_name_device=\u8bbe\u5907\u89e6\u53d1 +message.scene_trigger_name_timer=\u5b9a\u65f6\u89e6\u53d1 +message.scene_trigger_name_collector=\u91c7\u96c6\u5668\u89e6\u53d1 +message.scene_trigger_desc_manual=\u9002\u7528\u4e8e\u7b2c\u4e09\u65b9\u5e73\u53f0\u5411\u7269\u8054\u7f51\u5e73\u53f0\u4e0b\u53d1\u6307\u4ee4\u63a7\u5236\u8bbe\u5907 +message.scene_trigger_desc_device=\u9002\u7528\u4e8e\u8bbe\u5907\u6570\u636e\u6216\u884c\u4e3a\u6ee1\u8db3\u89e6\u53d1\u6761\u4ef6\u65f6\uff0c\u6267\u884c\u6307\u5b9a\u7684\u52a8\u4f5c +message.scene_trigger_desc_timer=\u9002\u7528\u4e8e\u5b9a\u671f\u6267\u884c\u56fa\u5b9a\u4efb\u52a1 +message.scene_trigger_desc_collector=\u9002\u7528\u4e8e\u91c7\u96c6\u5668\u70b9\u4f4d\u6ee1\u8db3\u89e6\u53d1\u6761\u4ef6\u65f6\uff0c\u6267\u884c\u6307\u5b9a\u7684\u52a8\u4f5c +#scene-action +message.scene_action_name_device=\u8bbe\u5907\u8f93\u51fa +message.scene_action_name_device-data=\u8bbe\u5907\u4fe1\u606f +message.scene_action_name_notify=\u6d88\u606f\u901a\u77e5 +message.scene_action_name_delay=\u5ef6\u8fdf\u6267\u884c +message.scene_action_name_trigger=\u89e6\u53d1\u544a\u8b66 +message.scene_action_name_relieve=\u89e3\u9664\u544a\u8b66 +message.scene_action_name_collector=\u91c7\u96c6\u5668\u8f93\u51fa +message.scene_action_desc_device=\u914d\u7f6e\u8bbe\u5907\u8c03\u7528\u529f\u80fd\u3001\u8bfb\u53d6\u5c5e\u6027\u3001\u8bbe\u7f6e\u5c5e\u6027\u89c4\u5219 +message.scene_action_desc_device-data=\u83b7\u53d6\u8bbe\u5907\u7684\u57fa\u672c\u4fe1\u606f\u3001\u5c5e\u6027\u3001\u6807\u7b7e\u3001\u4e8b\u4ef6\u7b49 +message.scene_action_desc_notify=\u914d\u7f6e\u5411\u6307\u5b9a\u7528\u6237\u53d1\u90ae\u4ef6\u3001\u9489\u9489\u3001\u5fae\u4fe1\u3001\u77ed\u4fe1\u7b49\u901a\u77e5 +message.scene_action_desc_delay=\u7b49\u5f85\u4e00\u6bb5\u65f6\u95f4\u540e\uff0c\u518d\u6267\u884c\u540e\u7eed\u52a8\u4f5c +message.scene_action_desc_trigger=\u914d\u7f6e\u89e6\u53d1\u544a\u8b66\u89c4\u5219\uff0c\u9700\u914d\u5408\u201c\u544a\u8b66\u914d\u7f6e\u201d\u4f7f\u7528 +message.scene_action_desc_relieve=\u914d\u7f6e\u89e3\u9664\u544a\u8b66\u89c4\u5219\uff0c\u9700\u914d\u5408\u201c\u544a\u8b66\u914d\u7f6e\u201d\u4f7f\u7528 +message.scene_action_desc_collector=\u914d\u7f6e\u8bfb\u53d6\u70b9\u4f4d\u3001\u8bbe\u7f6e\u70b9\u4f4d\u89c4\u5219 +#aggregation +message.scene_aggregation_name_COUNT=\u603b\u6570 +message.scene_aggregation_name_DISTINCT_COUNT=\u603b\u6570(\u53bb\u91cd) +message.scene_aggregation_name_MIN=\u6700\u5c0f\u503c +message.scene_aggregation_name_MAX=\u6700\u5927\u503c +message.scene_aggregation_name_AVG=\u5e73\u5747\u503c +message.scene_aggregation_name_SUM=\u603b\u548c +message.scene_aggregation_name_FIRST=\u7b2c\u4e00\u4e2a\u503c +message.scene_aggregation_name_LAST=\u6700\u540e\u4e00\u4e2a\u503c +message.scene_aggregation_name_MEDIAN=\u4e2d\u4f4d\u6570 +message.scene_aggregation_name_SPREAD=\u6781\u5dee +message.scene_aggregation_name_STDDEV=\u6807\u51c6\u5dee \ No newline at end of file diff --git a/pom.xml b/pom.xml index 55bc00338..11a36f063 100644 --- a/pom.xml +++ b/pom.xml @@ -30,7 +30,7 @@ 1.0.6 - 1.0.17 + 1.0.18-SNAPSHOT 1.0.2