Skip to content

Commit

Permalink
feat: add InfluxQL tags support (#584)
Browse files Browse the repository at this point in the history
  • Loading branch information
alespour authored Jun 22, 2023
1 parent 1640d60 commit a95ec06
Show file tree
Hide file tree
Showing 5 changed files with 104 additions and 9 deletions.
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
## 6.10.0 [unreleased]

### Bug Fixes
1. [#584](https://github.com/influxdata/influxdb-client-java/pull/584): InfluxQL tags support

### Dependencies

Update dependencies:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
package com.influxdb.query;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import javax.annotation.Nonnull;
Expand Down Expand Up @@ -86,6 +87,9 @@ public List<Series> getSeries() {
* Represents one series within the {@link Result} of an InfluxQL query.
*/
public static final class Series {
@Nonnull
private final Map<String, String> tags;

@Nonnull
private final Map<String, Integer> columns;

Expand All @@ -95,10 +99,18 @@ public static final class Series {
private final List<Record> values;

public Series(final @Nonnull String name, final @Nonnull Map<String, Integer> columns) {
this(name, new HashMap<>(), columns);
}

public Series(final @Nonnull String name,
final @Nonnull Map<String, String> tags,
final @Nonnull Map<String, Integer> columns) {
Arguments.checkNotNull(name, "name");
Arguments.checkNotNull(tags, "tags");
Arguments.checkNotNull(columns, "columns");

this.name = name;
this.tags = tags;
this.columns = columns;
this.values = new ArrayList<>();
}
Expand All @@ -111,6 +123,14 @@ public String getName() {
return this.name;
}

/**
* @return the tags
*/
@Nonnull
public Map<String, String> getTags() {
return this.tags;
}

/**
* @return the columns
*/
Expand Down
14 changes: 14 additions & 0 deletions client/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -479,6 +479,20 @@ public class InfluxQLExample {
}
```

When the data are grouped by tag(s) using `GROUP BY` clause, series tags are accessible
via `InfluxQLQueryResult.Series.getTags()` method, eg.
```java
...
for (InfluxQLQueryResult.Result resultResult : result.getResults()) {
for (InfluxQLQueryResult.Series series : resultResult.getSeries()) {
for (Map.Entry<String, String> tag : series.getTags().entrySet()) {
System.out.println(tag.getKey() + "=" + tag.getValue());
}
}
}
...
```

## Writes

The client offers two types of API to ingesting data:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@
import java.io.Reader;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -110,13 +112,13 @@ static InfluxQLQueryResult readInfluxQLResult(
@Nullable final InfluxQLQueryResult.Series.ValueExtractor valueExtractor
) throws IOException {
List<InfluxQLQueryResult.Result> results = new ArrayList<>();

Map<String, InfluxQLQueryResult.Series> series = null;
Map<List<Object>, InfluxQLQueryResult.Series> series = null;
Map<String, Integer> headerCols = null;
int nameCol = 0;
// The first 3 columns are static (`name`, `tags` and `time`) and got skipped.
final int nameCol = 0;
final int tagsCol = 1;
// The first 2 columns are static (`name`, `tags`) and got skipped.
// All other columns are dynamically parsed
int dynamicColumnsStartIndex = 2;
final int dynamicColumnsStartIndex = 2;

try (CSVParser parser = new CSVParser(reader, CSVFormat.DEFAULT.builder().setIgnoreEmptyLines(false).build())) {
for (CSVRecord csvRecord : parser) {
Expand Down Expand Up @@ -148,10 +150,11 @@ static InfluxQLQueryResult readInfluxQLResult(

} else {
String name = csvRecord.get(nameCol);
Map<String, String> finalTags = parseTags(csvRecord.get(tagsCol));
Map<String, Integer> finalHeaderCols = headerCols;
InfluxQLQueryResult.Series serie = series.computeIfAbsent(
name,
n -> new InfluxQLQueryResult.Series(n, finalHeaderCols)
Arrays.asList(name, finalTags),
n -> new InfluxQLQueryResult.Series(name, finalTags, finalHeaderCols)
);
Object[] values = headerCols.entrySet().stream().map(entry -> {
String value = csvRecord.get(entry.getValue() + dynamicColumnsStartIndex);
Expand All @@ -174,4 +177,16 @@ static InfluxQLQueryResult readInfluxQLResult(
}
return new InfluxQLQueryResult(results);
}

private static Map<String, String> parseTags(@Nonnull final String value) {
final Map<String, String> tags = new HashMap<>();
if (value.length() > 0) {
for (String entry : value.split(",")) {
final String[] kv = entry.split("=");
tags.put(kv[0], kv[1]);
}
}

return tags;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -65,12 +65,17 @@ void readInfluxQLResult() throws IOException {
"\n" +
"name,tags,name\n" +
"databases,,measurement-1\n" +
"databases,,measurement-2");
"databases,,measurement-2\n" +
"\n" +
"name,tags,time,usage_user,usage_system\n" +
"cpu,\"region=us-east-1,host=server1\",1483225200,13.57,1.4\n" +
"cpu,\"region=us-east-1,host=server1\",1483225201,14.06,1.7\n" +
"cpu,\"region=us-east-1,host=server2\",1483225200,67.91,1.3\n");

InfluxQLQueryResult result = InfluxQLQueryApiImpl.readInfluxQLResult(reader, NO_CANCELLING, extractValues);

List<InfluxQLQueryResult.Result> results = result.getResults();
Assertions.assertThat(results).hasSize(3);
Assertions.assertThat(results).hasSize(4);
Assertions.assertThat(results.get(0))
.extracting(InfluxQLQueryResult.Result::getSeries)
.satisfies(series -> {
Expand Down Expand Up @@ -127,5 +132,43 @@ void readInfluxQLResult() throws IOException {
.isEqualTo("measurement-2");
});
});

Assertions.assertThat(results.get(3))
.extracting(InfluxQLQueryResult.Result::getSeries)
.satisfies(series -> {
Assertions.assertThat(series).hasSize(2);
Assertions.assertThat(series.get(0))
.satisfies(series1 -> {
Assertions.assertThat(series1.getName()).isEqualTo("cpu");
Assertions.assertThat(series1.getTags()).containsOnlyKeys("region", "host");
Assertions.assertThat(series1.getTags().get("region")).isEqualTo("us-east-1");
Assertions.assertThat(series1.getTags().get("host")).isEqualTo("server1");
Assertions.assertThat(series1.getColumns()).containsOnlyKeys("time","usage_user","usage_system");
Assertions.assertThat(series1.getValues()).hasSize(2);

Assertions.assertThat( series1.getValues().get(0).getValueByKey("usage_user"))
.isEqualTo("13.57");
Assertions.assertThat( series1.getValues().get(0).getValueByKey("usage_system"))
.isEqualTo("1.4");
Assertions.assertThat( series1.getValues().get(1).getValueByKey("usage_user"))
.isEqualTo("14.06");
Assertions.assertThat( series1.getValues().get(1).getValueByKey("usage_system"))
.isEqualTo("1.7");
});
Assertions.assertThat(series.get(1))
.satisfies(series2 -> {
Assertions.assertThat(series2.getName()).isEqualTo("cpu");
Assertions.assertThat(series2.getTags()).containsOnlyKeys("region", "host");
Assertions.assertThat(series2.getTags().get("region")).isEqualTo("us-east-1");
Assertions.assertThat(series2.getTags().get("host")).isEqualTo("server2");
Assertions.assertThat(series2.getColumns()).containsOnlyKeys("time","usage_user","usage_system");
Assertions.assertThat(series2.getValues()).hasSize(1);

Assertions.assertThat( series2.getValues().get(0).getValueByKey("usage_user"))
.isEqualTo("67.91");
Assertions.assertThat( series2.getValues().get(0).getValueByKey("usage_system"))
.isEqualTo("1.3");
});
});
}
}

0 comments on commit a95ec06

Please sign in to comment.