Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Prometheus select metric and stats queries. #956

Merged
merged 3 commits into from
Nov 3, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -203,6 +203,12 @@ public LogicalPlan visitTableFunction(TableFunction node, AnalysisContext contex
TableFunctionImplementation tableFunctionImplementation
= (TableFunctionImplementation) repository.compile(
catalogSchemaIdentifierNameResolver.getCatalogName(), functionName, arguments);
context.push();
TypeEnvironment curEnv = context.peek();
Table table = tableFunctionImplementation.applyArguments();
table.getFieldTypes().forEach((k, v) -> curEnv.define(new Symbol(Namespace.FIELD_NAME, k), v));
curEnv.define(new Symbol(Namespace.INDEX_NAME,
catalogSchemaIdentifierNameResolver.getIdentifierName()), STRUCT);
return new LogicalRelation(catalogSchemaIdentifierNameResolver.getIdentifierName(),
tableFunctionImplementation.applyArguments());
}
Expand Down
113 changes: 113 additions & 0 deletions docs/user/ppl/admin/prometheus_connector.rst
Original file line number Diff line number Diff line change
Expand Up @@ -72,3 +72,116 @@ AWSSigV4 Auth::
}
}]

PPL Query support for prometheus connector
==========================================

Metric as a Table
---------------------------
Each connector has to abstract the underlying datasource constructs into a table as part of the interface contract with the PPL query engine.
Prometheus connector abstracts each metric as a table and the columns of this table are ``@value``, ``@timestamp``, ``label1``, ``label2``---.
``@value`` represents metric measurement and ``@timestamp`` represents the timestamp at which the metric is collected. labels are tags associated with metric queried.
For eg: ``handler``, ``code``, ``instance``, ``code`` are the labels associated with ``prometheus_http_requests_total`` metric. With this abstraction, we can query prometheus
data using PPL syntax similar to opensearch indices.

Sample Example::

> source = my_prometheus.prometheus_http_requests_total;

+------------+------------------------+--------------------------------+---------------+-------------+-------------+
| @value | @timestamp | handler | code | instance | job |
|------------+------------------------+--------------------------------+---------------+-------------+-------------|
| 5 | "2022-11-03 07:18:14" | "/-/ready" | 200 | 192.15.1.1 | prometheus |
| 3 | "2022-11-03 07:18:24" | "/-/ready" | 200 | 192.15.1.1 | prometheus |
| 7 | "2022-11-03 07:18:34" | "/-/ready" | 200 | 192.15.1.1 | prometheus |
| 2 | "2022-11-03 07:18:44" | "/-/ready" | 400 | 192.15.2.1 | prometheus |
| 9 | "2022-11-03 07:18:54" | "/-/promql" | 400 | 192.15.2.1 | prometheus |
| 11 | "2022-11-03 07:18:64" |"/-/metrics" | 500 | 192.15.2.1 | prometheus |
+------------+------------------------+--------------------------------+---------------+-------------+-------------+



Default time range and resolution
---------------------------------
Since time range and resolution are required parameters for query apis and these parameters are determined in the following manner from the PPL commands.
* Time range is determined through filter clause on ``@timestamp``. If there is no such filter clause, time range will be set to 1h with endtime set to now().
* In case of stats, resolution is determined by ``span(@timestamp,15s)`` expression. For normal select queries, resolution is auto determined from the time range set.

Prometheus Connector Limitations
--------------------------------
* Only one aggregation is supported in stats command.
* Span Expression is compulsory in stats command.
* AVG, MAX, MIN, SUM, COUNT are the only aggregations supported in prometheus connector.

Example queries
---------------

1. Metric Selection Query::

> source = my_prometheus.prometheus_http_requests_total
+------------+------------------------+--------------------------------+---------------+-------------+-------------+
| @value | @timestamp | handler | code | instance | job |
|------------+------------------------+--------------------------------+---------------+-------------+-------------|
| 5 | "2022-11-03 07:18:14" | "/-/ready" | 200 | 192.15.1.1 | prometheus |
| 3 | "2022-11-03 07:18:24" | "/-/ready" | 200 | 192.15.1.1 | prometheus |
| 7 | "2022-11-03 07:18:34" | "/-/ready" | 200 | 192.15.1.1 | prometheus |
| 2 | "2022-11-03 07:18:44" | "/-/ready" | 400 | 192.15.2.1 | prometheus |
| 9 | "2022-11-03 07:18:54" | "/-/promql" | 400 | 192.15.2.1 | prometheus |
| 11 | "2022-11-03 07:18:64" |"/-/metrics" | 500 | 192.15.2.1 | prometheus |
+------------+------------------------+--------------------------------+---------------+-------------+-------------+

2. Metric Selecting Query with specific dimensions::

> source = my_prometheus.prometheus_http_requests_total | where handler='/-/ready' and code='200'
+------------+------------------------+--------------------------------+---------------+-------------+-------------+
| @value | @timestamp | handler | code | instance | job |
|------------+------------------------+--------------------------------+---------------+-------------+-------------|
| 5 | "2022-11-03 07:18:14" | "/-/ready" | 200 | 192.15.1.1 | prometheus |
| 3 | "2022-11-03 07:18:24" | "/-/ready" | 200 | 192.15.1.1 | prometheus |
| 7 | "2022-11-03 07:18:34" | "/-/ready" | 200 | 192.15.1.1 | prometheus |
| 2 | "2022-11-03 07:18:44" | "/-/ready" | 200 | 192.15.2.1 | prometheus |
| 9 | "2022-11-03 07:18:54" | "/-/ready" | 200 | 192.15.2.1 | prometheus |
| 11 | "2022-11-03 07:18:64" | "/-/ready" | 200 | 192.15.2.1 | prometheus |
+------------+------------------------+--------------------------------+---------------+-------------+-------------+

3. Average aggregation on a metric::

> source = my_prometheus.prometheus_http_requests_total | stats avg(@value) by span(@timestamp,15s)
+------------+------------------------+
| avg(@value)| span(@timestamp,15s) |
|------------+------------------------+
| 5 | "2022-11-03 07:18:14" |
| 3 | "2022-11-03 07:18:24" |
| 7 | "2022-11-03 07:18:34" |
| 2 | "2022-11-03 07:18:44" |
| 9 | "2022-11-03 07:18:54" |
| 11 | "2022-11-03 07:18:64" |
+------------+------------------------+

4. Average aggregation grouped by dimensions::

> source = my_prometheus.prometheus_http_requests_total | stats avg(@value) by span(@timestamp,15s), handler, code
+------------+------------------------+--------------------------------+---------------+
| avg(@value)| span(@timestamp,15s) | handler | code |
|------------+------------------------+--------------------------------+---------------+
| 5 | "2022-11-03 07:18:14" | "/-/ready" | 200 |
| 3 | "2022-11-03 07:18:24" | "/-/ready" | 200 |
| 7 | "2022-11-03 07:18:34" | "/-/ready" | 200 |
| 2 | "2022-11-03 07:18:44" | "/-/ready" | 400 |
| 9 | "2022-11-03 07:18:54" | "/-/promql" | 400 |
| 11 | "2022-11-03 07:18:64" | "/-/metrics" | 500 |
+------------+------------------------+--------------------------------+---------------+

5. Count aggregation query::

> source = my_prometheus.prometheus_http_requests_total | stats count() by span(@timestamp,15s), handler, code
+------------+------------------------+--------------------------------+---------------+
| count() | span(@timestamp,15s) | handler | code |
|------------+------------------------+--------------------------------+---------------+
| 5 | "2022-11-03 07:18:14" | "/-/ready" | 200 |
| 3 | "2022-11-03 07:18:24" | "/-/ready" | 200 |
| 7 | "2022-11-03 07:18:34" | "/-/ready" | 200 |
| 2 | "2022-11-03 07:18:44" | "/-/ready" | 400 |
| 9 | "2022-11-03 07:18:54" | "/-/promql" | 400 |
| 11 | "2022-11-03 07:18:64" | "/-/metrics" | 500 |
+------------+------------------------+--------------------------------+---------------+

3 changes: 1 addition & 2 deletions docs/user/ppl/cmd/describe.rst
Original file line number Diff line number Diff line change
Expand Up @@ -75,11 +75,10 @@ The example retrieves table info for ``prometheus_http_requests_total`` metric i
PPL query::

os> describe my_prometheus.prometheus_http_requests_total;
fetched rows / total rows = 7/7
fetched rows / total rows = 6/6
+-----------------+----------------+--------------------------------+---------------+-------------+
| TABLE_CATALOG | TABLE_SCHEMA | TABLE_NAME | COLUMN_NAME | DATA_TYPE |
|-----------------+----------------+--------------------------------+---------------+-------------|
| my_prometheus | default | prometheus_http_requests_total | @labels | keyword |
| my_prometheus | default | prometheus_http_requests_total | handler | keyword |
| my_prometheus | default | prometheus_http_requests_total | code | keyword |
| my_prometheus | default | prometheus_http_requests_total | instance | keyword |
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,6 @@ public void testDescribeCommandWithPrometheusCatalog() throws IOException {
columnName("DATA_TYPE")
);
verifyDataRows(result,
rows("my_prometheus", "default", "prometheus_http_requests_total", "@labels", "keyword"),
rows("my_prometheus", "default", "prometheus_http_requests_total", "handler", "keyword"),
rows("my_prometheus", "default", "prometheus_http_requests_total", "code", "keyword"),
rows("my_prometheus", "default", "prometheus_http_requests_total", "instance", "keyword"),
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,158 @@
/*
*
* * Copyright OpenSearch Contributors
* * SPDX-License-Identifier: Apache-2.0
*
*/

package org.opensearch.sql.ppl;

import static org.opensearch.sql.prometheus.data.constants.PrometheusFieldConstants.TIMESTAMP;
import static org.opensearch.sql.prometheus.data.constants.PrometheusFieldConstants.VALUE;
import static org.opensearch.sql.util.MatcherUtils.schema;
import static org.opensearch.sql.util.MatcherUtils.verifySchema;

import lombok.SneakyThrows;
import org.apache.commons.lang3.StringUtils;
import org.json.JSONArray;
import org.json.JSONObject;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;

public class PrometheusCatalogCommandsIT extends PPLIntegTestCase {
vamsimanohar marked this conversation as resolved.
Show resolved Hide resolved
vamsimanohar marked this conversation as resolved.
Show resolved Hide resolved
vamsimanohar marked this conversation as resolved.
Show resolved Hide resolved

@Test
@SneakyThrows
public void testSourceMetricCommand() {
JSONObject response =
executeQuery("source=my_prometheus.prometheus_http_requests_total");
verifySchema(response,
schema(VALUE, "double"),
schema(TIMESTAMP, "timestamp"),
schema("handler", "string"),
schema("code", "string"),
schema("instance", "string"),
schema("job", "string"));
Assertions.assertTrue(response.getInt("size") > 0);
Assertions.assertEquals(6, response.getJSONArray("datarows").getJSONArray(0).length());
JSONArray firstRow = response.getJSONArray("datarows").getJSONArray(0);
for (int i = 0; i < firstRow.length(); i++) {
Assertions.assertNotNull(firstRow.get(i));
Assertions.assertTrue(StringUtils.isNotEmpty(firstRow.get(i).toString()));
}
}

@Test
@SneakyThrows
public void testMetricAvgAggregationCommand() {
JSONObject response =
executeQuery("source=my_prometheus.prometheus_http_requests_total | stats avg(@value) by span(@timestamp, 15s), handler, job");
verifySchema(response,
schema("avg(@value)", "double"),
schema("span(@timestamp,15s)", "timestamp"),
schema("handler", "string"),
schema("job", "string"));
Assertions.assertTrue(response.getInt("size") > 0);
Assertions.assertEquals(4, response.getJSONArray("datarows").getJSONArray(0).length());
JSONArray firstRow = response.getJSONArray("datarows").getJSONArray(0);
for (int i = 0; i < firstRow.length(); i++) {
Assertions.assertNotNull(firstRow.get(i));
Assertions.assertTrue(StringUtils.isNotEmpty(firstRow.get(i).toString()));
}
}

@Test
@SneakyThrows
public void testMetricAvgAggregationCommandWithAlias() {
JSONObject response =
executeQuery("source=my_prometheus.prometheus_http_requests_total | stats avg(@value) as agg by span(@timestamp, 15s), handler, job");
verifySchema(response,
schema("agg", "double"),
schema("span(@timestamp,15s)", "timestamp"),
schema("handler", "string"),
schema("job", "string"));
Assertions.assertTrue(response.getInt("size") > 0);
Assertions.assertEquals(4, response.getJSONArray("datarows").getJSONArray(0).length());
JSONArray firstRow = response.getJSONArray("datarows").getJSONArray(0);
for (int i = 0; i < firstRow.length(); i++) {
Assertions.assertNotNull(firstRow.get(i));
Assertions.assertTrue(StringUtils.isNotEmpty(firstRow.get(i).toString()));
}
}


@Test
@SneakyThrows
public void testMetricMaxAggregationCommand() {
JSONObject response =
executeQuery("source=my_prometheus.prometheus_http_requests_total | stats max(@value) by span(@timestamp, 15s)");
verifySchema(response,
schema("max(@value)", "double"),
schema("span(@timestamp,15s)", "timestamp"));
Assertions.assertTrue(response.getInt("size") > 0);
Assertions.assertEquals(2, response.getJSONArray("datarows").getJSONArray(0).length());
JSONArray firstRow = response.getJSONArray("datarows").getJSONArray(0);
for (int i = 0; i < firstRow.length(); i++) {
Assertions.assertNotNull(firstRow.get(i));
Assertions.assertTrue(StringUtils.isNotEmpty(firstRow.get(i).toString()));
}
}


@Test
@SneakyThrows
public void testMetricMinAggregationCommand() {
JSONObject response =
executeQuery("source=my_prometheus.prometheus_http_requests_total | stats min(@value) by span(@timestamp, 15s), handler");
verifySchema(response,
schema("min(@value)", "double"),
schema("span(@timestamp,15s)", "timestamp"),
schema("handler", "string"));
Assertions.assertTrue(response.getInt("size") > 0);
Assertions.assertEquals(3, response.getJSONArray("datarows").getJSONArray(0).length());
JSONArray firstRow = response.getJSONArray("datarows").getJSONArray(0);
for (int i = 0; i < firstRow.length(); i++) {
Assertions.assertNotNull(firstRow.get(i));
Assertions.assertTrue(StringUtils.isNotEmpty(firstRow.get(i).toString()));
}
}

@Test
@SneakyThrows
public void testMetricCountAggregationCommand() {
JSONObject response =
executeQuery("source=my_prometheus.prometheus_http_requests_total | stats count() by span(@timestamp, 15s), handler, job");
verifySchema(response,
schema("count()", "integer"),
schema("span(@timestamp,15s)", "timestamp"),
schema("handler", "string"),
schema("job", "string"));
Assertions.assertTrue(response.getInt("size") > 0);
Assertions.assertEquals(4, response.getJSONArray("datarows").getJSONArray(0).length());
JSONArray firstRow = response.getJSONArray("datarows").getJSONArray(0);
for (int i = 0; i < firstRow.length(); i++) {
Assertions.assertNotNull(firstRow.get(i));
Assertions.assertTrue(StringUtils.isNotEmpty(firstRow.get(i).toString()));
}
}

@Test
@SneakyThrows
public void testMetricSumAggregationCommand() {
JSONObject response =
executeQuery("source=my_prometheus.prometheus_http_requests_total | stats sum(@value) by span(@timestamp, 15s), handler, job");
verifySchema(response,
schema("sum(@value)", "double"),
schema("span(@timestamp,15s)", "timestamp"),
schema("handler", "string"),
schema("job", "string"));
Assertions.assertTrue(response.getInt("size") > 0);
Assertions.assertEquals(4, response.getJSONArray("datarows").getJSONArray(0).length());
JSONArray firstRow = response.getJSONArray("datarows").getJSONArray(0);
for (int i = 0; i < firstRow.length(); i++) {
Assertions.assertNotNull(firstRow.get(i));
Assertions.assertTrue(StringUtils.isNotEmpty(firstRow.get(i).toString()));
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ public void loadConnectors(Settings settings) {
} catch (IOException e) {
LOG.error("Catalog Configuration File uploaded is malformed. Verify and re-upload.", e);
} catch (Throwable e) {
LOG.error("Catalog constructed failed.", e);
LOG.error("Catalog construction failed.", e);
}
}
return null;
Expand Down
2 changes: 0 additions & 2 deletions ppl/src/main/antlr/OpenSearchPPLParser.g4
Original file line number Diff line number Diff line change
Expand Up @@ -161,8 +161,6 @@ mlArg
fromClause
: SOURCE EQUAL tableSourceClause
| INDEX EQUAL tableSourceClause
| SOURCE EQUAL tableFunction
| INDEX EQUAL tableFunction
;

tableSourceClause
Expand Down
19 changes: 5 additions & 14 deletions ppl/src/main/java/org/opensearch/sql/ppl/parser/AstBuilder.java
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import java.util.List;
import java.util.Optional;
import java.util.stream.Collectors;
import lombok.Generated;
import lombok.RequiredArgsConstructor;
import org.antlr.v4.runtime.ParserRuleContext;
import org.antlr.v4.runtime.Token;
Expand Down Expand Up @@ -348,11 +349,7 @@ public UnresolvedPlan visitTopCommand(TopCommandContext ctx) {
*/
@Override
public UnresolvedPlan visitFromClause(FromClauseContext ctx) {
if (ctx.tableFunction() != null) {
return visitTableFunction(ctx.tableFunction());
} else {
return visitTableSourceClause(ctx.tableSourceClause());
}
return visitTableSourceClause(ctx.tableSourceClause());
}

@Override
Expand All @@ -363,16 +360,10 @@ public UnresolvedPlan visitTableSourceClause(TableSourceClauseContext ctx) {
}

@Override
@Generated //To exclude from jacoco..will remove https://github.com/opensearch-project/sql/issues/1019
public UnresolvedPlan visitTableFunction(OpenSearchPPLParser.TableFunctionContext ctx) {
ImmutableList.Builder<UnresolvedExpression> builder = ImmutableList.builder();
ctx.functionArgs().functionArg().forEach(arg
-> {
String argName = (arg.ident() != null) ? arg.ident().getText() : null;
builder.add(
new UnresolvedArgument(argName,
this.internalVisitExpression(arg.valueExpression())));
});
return new TableFunction(this.internalVisitExpression(ctx.qualifiedName()), builder.build());
//<TODO>
return null;
}

/**
Expand Down
Loading