Skip to content

Commit

Permalink
[influxdb] Fix queries with data migrated from InfluxDB1 without item…
Browse files Browse the repository at this point in the history
… tags (#10937)

Signed-off-by: Joan Pujol <[email protected]>
  • Loading branch information
lujop authored Jul 6, 2021
1 parent cdb8d46 commit 26258e8
Show file tree
Hide file tree
Showing 3 changed files with 27 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -32,4 +32,5 @@ public class InfluxDBConstants {
public static final String TAG_CATEGORY_NAME = "category";
public static final String TAG_TYPE_NAME = "type";
public static final String TAG_LABEL_NAME = "label";
public static final String FIELD_MEASUREMENT_NAME = "_measurement";
}
Original file line number Diff line number Diff line change
Expand Up @@ -66,10 +66,18 @@ public String createQuery(FilterCriteria criteria, String retentionPolicy) {
String itemName = criteria.getItemName();
if (itemName != null) {
String measurementName = calculateMeasurementName(itemName);
boolean needsToUseItemTagName = !measurementName.equals(itemName);

flux = flux.filter(measurement().equal(measurementName));
if (!measurementName.equals(itemName)) {
flux = flux.filter(tag("item").equal(itemName));
if (needsToUseItemTagName) {
flux = flux.filter(tag(TAG_ITEM_NAME).equal(itemName));
}

if (needsToUseItemTagName)
flux = flux.keep(new String[] { FIELD_MEASUREMENT_NAME, COLUMN_TIME_NAME_V2, COLUMN_VALUE_NAME_V2,
TAG_ITEM_NAME });
else
flux = flux.keep(new String[] { FIELD_MEASUREMENT_NAME, COLUMN_TIME_NAME_V2, COLUMN_VALUE_NAME_V2 });
}

if (criteria.getState() != null && criteria.getOperator() != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,8 +79,10 @@ public void testSimpleItemQueryWithoutParams() {
assertThat(queryV1, equalTo("SELECT \"value\"::field,\"item\"::tag FROM origin.sampleItem;"));

String queryV2 = instanceV2.createQuery(criteria, RETENTION_POLICY);
assertThat(queryV2, equalTo("from(bucket:\"origin\")\n\t" + "|> range(start:-100y)\n\t"
+ "|> filter(fn: (r) => r[\"_measurement\"] == \"sampleItem\")"));
assertThat(queryV2,
equalTo("from(bucket:\"origin\")\n\t" + "|> range(start:-100y)\n\t"
+ "|> filter(fn: (r) => r[\"_measurement\"] == \"sampleItem\")\n\t"
+ "|> keep(columns:[\"_measurement\", \"_time\", \"_value\"])"));
}

@Test
Expand Down Expand Up @@ -112,7 +114,8 @@ public void testRangeCriteria() {
String queryV2 = instanceV2.createQuery(criteria, RETENTION_POLICY);
String expectedQueryV2 = String.format(
"from(bucket:\"origin\")\n\t" + "|> range(start:%s, stop:%s)\n\t"
+ "|> filter(fn: (r) => r[\"_measurement\"] == \"sampleItem\")",
+ "|> filter(fn: (r) => r[\"_measurement\"] == \"sampleItem\")\n\t"
+ "|> keep(columns:[\"_measurement\", \"_time\", \"_value\"])",
INFLUX2_DATE_FORMATTER.format(now.toInstant()), INFLUX2_DATE_FORMATTER.format(tomorrow.toInstant()));
assertThat(queryV2, equalTo(expectedQueryV2));
}
Expand All @@ -130,6 +133,7 @@ public void testValueOperator() {
assertThat(queryV2,
equalTo("from(bucket:\"origin\")\n\t" + "|> range(start:-100y)\n\t"
+ "|> filter(fn: (r) => r[\"_measurement\"] == \"sampleItem\")\n\t"
+ "|> keep(columns:[\"_measurement\", \"_time\", \"_value\"])\n\t"
+ "|> filter(fn: (r) => (r[\"_field\"] == \"value\" and r[\"_value\"] <= 90))"));
}

Expand All @@ -144,7 +148,8 @@ public void testPagination() {

String queryV2 = instanceV2.createQuery(criteria, RETENTION_POLICY);
assertThat(queryV2, equalTo("from(bucket:\"origin\")\n\t" + "|> range(start:-100y)\n\t"
+ "|> filter(fn: (r) => r[\"_measurement\"] == \"sampleItem\")\n\t" + "|> limit(n:10, offset:20)"));
+ "|> filter(fn: (r) => r[\"_measurement\"] == \"sampleItem\")\n\t"
+ "|> keep(columns:[\"_measurement\", \"_time\", \"_value\"])\n\t" + "|> limit(n:10, offset:20)"));
}

@Test
Expand All @@ -159,6 +164,7 @@ public void testOrdering() {
assertThat(queryV2,
equalTo("from(bucket:\"origin\")\n\t" + "|> range(start:-100y)\n\t"
+ "|> filter(fn: (r) => r[\"_measurement\"] == \"sampleItem\")\n\t"
+ "|> keep(columns:[\"_measurement\", \"_time\", \"_value\"])\n\t"
+ "|> sort(desc:false, columns:[\"_time\"])"));
}

Expand Down Expand Up @@ -189,7 +195,8 @@ public void testMeasurementNameFromMetadata() {
assertThat(queryV2,
equalTo("from(bucket:\"origin\")\n\t" + "|> range(start:-100y)\n\t"
+ "|> filter(fn: (r) => r[\"_measurement\"] == \"measurementName\")\n\t"
+ "|> filter(fn: (r) => r[\"item\"] == \"sampleItem\")"));
+ "|> filter(fn: (r) => r[\"item\"] == \"sampleItem\")\n\t"
+ "|> keep(columns:[\"_measurement\", \"_time\", \"_value\", \"item\"])"));

when(metadataRegistry.get(metadataKey))
.thenReturn(new Metadata(metadataKey, "", Map.of("key1", "val1", "key2", "val2")));
Expand All @@ -198,7 +205,9 @@ public void testMeasurementNameFromMetadata() {
assertThat(queryV1, equalTo("SELECT \"value\"::field,\"item\"::tag FROM origin.sampleItem;"));

queryV2 = instanceV2.createQuery(criteria, RETENTION_POLICY);
assertThat(queryV2, equalTo("from(bucket:\"origin\")\n\t" + "|> range(start:-100y)\n\t"
+ "|> filter(fn: (r) => r[\"_measurement\"] == \"sampleItem\")"));
assertThat(queryV2,
equalTo("from(bucket:\"origin\")\n\t" + "|> range(start:-100y)\n\t"
+ "|> filter(fn: (r) => r[\"_measurement\"] == \"sampleItem\")\n\t"
+ "|> keep(columns:[\"_measurement\", \"_time\", \"_value\"])"));
}
}

0 comments on commit 26258e8

Please sign in to comment.