Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Removing default integtest.sh (1.1). #178

Closed
wants to merge 26 commits into from
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
26 commits
Select commit Hold shift + click to select a range
83663a5
Added release notes for OpenSearch 1.0.0.0. (#123) (#124)
adityaj1107 Jul 1, 2021
a9136cb
Add Integtest.sh for OpenSearch integtest setups (#121)
peterzhuamazon Jul 9, 2021
0c4c2fc
Remove default assignee (#127)
lezzago Jul 12, 2021
ff57c54
Removing All Usages of Action Get Method Calls and adding the listene…
adityaj1107 Jul 22, 2021
751aabd
Fix snapshot build and increment to 1.1.0. (#142)
dblock Aug 17, 2021
352d272
Refactor MonitorRunner (#143)
qreshi Aug 20, 2021
764c14a
Update Bucket-Level Alerting RFC (#145)
qreshi Aug 20, 2021
25194d6
Add BucketSelector pipeline aggregation extension (#144)
qreshi Aug 23, 2021
dd8b281
Add AggregationResultBucket (#148)
qreshi Aug 23, 2021
ecd283f
Add ActionExecutionPolicy (#149)
qreshi Aug 24, 2021
e8c474f
Refactor Monitor and Trigger to split into Query-Level and Bucket-Lev…
qreshi Aug 25, 2021
d31f0a1
Update InputService for Bucket-Level Alerting (#152)
qreshi Aug 26, 2021
bf472b2
Update TriggerService for Bucket-Level Alerting (#153)
qreshi Aug 26, 2021
2351783
Update AlertService for Bucket-Level Alerting (#154)
qreshi Aug 26, 2021
95306f0
Add worksheets to help with testing (#151)
qreshi Aug 26, 2021
28f401d
Update MonitorRunner for Bucket-Level Alerting (#155)
qreshi Aug 27, 2021
0eed799
Fix ktlint formatting issues (#156)
qreshi Aug 27, 2021
2d60ede
Execute Actions on runTrigger exceptions for Bucket-Level Monitor (#157)
qreshi Aug 27, 2021
8a1dc1f
Skip execution of Actions on ACKNOWLEDGED Alerts for Bucket-Level Mon…
qreshi Aug 30, 2021
6aa4fdb
Return first page of input results in MonitorRunResult for Bucket-Lev…
qreshi Aug 30, 2021
cc0fb6f
Add setting to limit per alert action executions and don't save Alert…
qreshi Aug 31, 2021
6f7afa9
Fix bug in paginating multiple bucket paths for Bucket-Level Monitor …
qreshi Sep 1, 2021
faac2bd
Various bug fixes pertaining to throttling on PER_ALERT, saving COMPL…
qreshi Sep 1, 2021
8024b8b
Return only monitors for /monitors/_search. (#162)
skkosuri-amzn Sep 1, 2021
d5dbdd6
Resolve default for ActionExecutionPolicy at runtime (#165)
qreshi Sep 1, 2021
ebab4da
Add release notes for 1.1.0.0 release (#166)
qreshi Sep 8, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Update TriggerService for Bucket-Level Alerting (#153)
* Update TriggerService for Bucket-Level Alerting

Signed-off-by: Mohammad Qureshi <[email protected]>

* Remove client from TriggerService

Signed-off-by: Mohammad Qureshi <[email protected]>
  • Loading branch information
qreshi authored Aug 26, 2021
commit bf472b29302b42fd797162ab50c33e4d987cb1bc
Original file line number Diff line number Diff line change
Expand Up @@ -259,7 +259,7 @@ internal class AlertingPlugin : PainlessExtension, ActionPlugin, ScriptPlugin, R
.registerThreadPool(threadPool)
.registerAlertIndices(alertIndices)
.registerInputService(InputService(client, scriptService, xContentRegistry))
.registerTriggerService(TriggerService(client, scriptService))
.registerTriggerService(TriggerService(scriptService))
.registerAlertService(AlertService(client, xContentRegistry, alertIndices))
.registerConsumers()
.registerDestinationSettings()
Expand Down
80 changes: 71 additions & 9 deletions alerting/src/main/kotlin/org/opensearch/alerting/TriggerService.kt
Original file line number Diff line number Diff line change
Expand Up @@ -12,36 +12,98 @@
package org.opensearch.alerting

import org.apache.logging.log4j.LogManager
import org.opensearch.alerting.aggregation.bucketselectorext.BucketSelectorIndices.Fields.BUCKET_INDICES
import org.opensearch.alerting.aggregation.bucketselectorext.BucketSelectorIndices.Fields.PARENT_BUCKET_PATH
import org.opensearch.alerting.model.AggregationResultBucket
import org.opensearch.alerting.model.Alert
import org.opensearch.alerting.model.BucketLevelTrigger
import org.opensearch.alerting.model.BucketLevelTriggerRunResult
import org.opensearch.alerting.model.Monitor
import org.opensearch.alerting.model.Trigger
import org.opensearch.alerting.model.TriggerRunResult
import org.opensearch.alerting.script.TriggerExecutionContext
import org.opensearch.alerting.model.QueryLevelTrigger
import org.opensearch.alerting.model.QueryLevelTriggerRunResult
import org.opensearch.alerting.script.BucketLevelTriggerExecutionContext
import org.opensearch.alerting.script.QueryLevelTriggerExecutionContext
import org.opensearch.alerting.script.TriggerScript
import org.opensearch.client.Client
import org.opensearch.alerting.util.getBucketKeysHash
import org.opensearch.script.ScriptService
import org.opensearch.search.aggregations.Aggregation
import org.opensearch.search.aggregations.Aggregations
import org.opensearch.search.aggregations.support.AggregationPath
import java.lang.IllegalArgumentException

/** Service that handles executing Triggers */
class TriggerService(val client: Client, val scriptService: ScriptService) {
class TriggerService(val scriptService: ScriptService) {

private val logger = LogManager.getLogger(TriggerService::class.java)

fun isTriggerActionable(ctx: TriggerExecutionContext, result: TriggerRunResult): Boolean {
fun isQueryLevelTriggerActionable(ctx: QueryLevelTriggerExecutionContext, result: QueryLevelTriggerRunResult): Boolean {
// Suppress actions if the current alert is acknowledged and there are no errors.
val suppress = ctx.alert?.state == Alert.State.ACKNOWLEDGED && result.error == null && ctx.error == null
return result.triggered && !suppress
}

fun runTrigger(monitor: Monitor, trigger: Trigger, ctx: TriggerExecutionContext): TriggerRunResult {
fun runQueryLevelTrigger(
monitor: Monitor,
trigger: QueryLevelTrigger,
ctx: QueryLevelTriggerExecutionContext
): QueryLevelTriggerRunResult {
return try {
val triggered = scriptService.compile(trigger.condition, TriggerScript.CONTEXT)
.newInstance(trigger.condition.params)
.execute(ctx)
TriggerRunResult(trigger.name, triggered, null)
QueryLevelTriggerRunResult(trigger.name, triggered, null)
} catch (e: Exception) {
logger.info("Error running script for monitor ${monitor.id}, trigger: ${trigger.id}", e)
// if the script fails we need to send an alert so set triggered = true
TriggerRunResult(trigger.name, true, e)
QueryLevelTriggerRunResult(trigger.name, true, e)
}
}

@Suppress("UNCHECKED_CAST")
fun runBucketLevelTrigger(
monitor: Monitor,
trigger: BucketLevelTrigger,
ctx: BucketLevelTriggerExecutionContext
): BucketLevelTriggerRunResult {
return try {
val bucketIndices =
((ctx.results[0][Aggregations.AGGREGATIONS_FIELD] as HashMap<*, *>)[trigger.id] as HashMap<*, *>)[BUCKET_INDICES] as List<*>
val parentBucketPath = ((ctx.results[0][Aggregations.AGGREGATIONS_FIELD] as HashMap<*, *>)
.get(trigger.id) as HashMap<*, *>)[PARENT_BUCKET_PATH] as String
val aggregationPath = AggregationPath.parse(parentBucketPath)
// TODO test this part by passing sub-aggregation path
var parentAgg = (ctx.results[0][Aggregations.AGGREGATIONS_FIELD] as HashMap<*, *>)
aggregationPath.pathElementsAsStringList.forEach { sub_agg ->
parentAgg = (parentAgg[sub_agg] as HashMap<*, *>)
}
val buckets = parentAgg[Aggregation.CommonFields.BUCKETS.preferredName] as List<*>
val selectedBuckets = mutableMapOf<String, AggregationResultBucket>()
for (bucketIndex in bucketIndices) {
val bucketDict = buckets[bucketIndex as Int] as Map<String, Any>
val bucketKeyValuesList = getBucketKeyValuesList(bucketDict)
val aggResultBucket = AggregationResultBucket(parentBucketPath, bucketKeyValuesList, bucketDict)
selectedBuckets[aggResultBucket.getBucketKeysHash()] = aggResultBucket
}
BucketLevelTriggerRunResult(trigger.name, null, selectedBuckets)
} catch (e: Exception) {
logger.info("Error running script for monitor ${monitor.id}, trigger: ${trigger.id}", e)
// TODO empty map here with error should be treated in the same way as QueryLevelTrigger with error running script
BucketLevelTriggerRunResult(trigger.name, e, emptyMap())
}
}

@Suppress("UNCHECKED_CAST")
private fun getBucketKeyValuesList(bucket: Map<String, Any>): List<String> {
val keyField = Aggregation.CommonFields.KEY.preferredName
val keyValuesList = mutableListOf<String>()
when {
bucket[keyField] is String -> keyValuesList.add(bucket[keyField] as String)
// In the case where the key field is an object with multiple values (such as a composite aggregation with more than one source)
// the values will be iterated through and converted into a string
bucket[keyField] is Map<*, *> -> (bucket[keyField] as Map<String, Any>).values.map { keyValuesList.add(it as String) }
else -> throw IllegalArgumentException("Unexpected format for key in bucket [$bucket]")
}

return keyValuesList
}
}