Skip to content

Commit

Permalink
[Bug fix] parse elasticsearch aggregations result
Browse files Browse the repository at this point in the history
  • Loading branch information
lilinfeng04 committed May 30, 2019
1 parent 4508975 commit 09c6f39
Show file tree
Hide file tree
Showing 2 changed files with 428 additions and 31 deletions.
72 changes: 41 additions & 31 deletions redash/query_runner/elasticsearch.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
from copy import copy
import logging
import sys
import urllib
Expand Down Expand Up @@ -183,11 +184,10 @@ def add_column_if_needed(mappings, column_name, friendly_name, result_columns, r
"type": mappings.get(column_name, "string")})
result_columns_index[friendly_name] = result_columns[-1]

def get_row(rows, row):
if row is None:
row = {}
rows.append(row)
return row
def collect_row(rows, row):
if rows and rows[-1] == row:
return
rows.append(copy(row))

def collect_value(mappings, row, key, value, type):
if result_fields and key not in result_fields_index:
Expand All @@ -197,36 +197,45 @@ def collect_value(mappings, row, key, value, type):
add_column_if_needed(mappings, key, key, result_columns, result_columns_index)
row[key] = value

def clean_value(row, key):
for _key in row:
if _key.startswith(key):
row[_key] = None
return row

def collect_aggregations(mappings, rows, parent_key, data, row, result_columns, result_columns_index):
if isinstance(data, dict):
for key, value in data.iteritems():
val = collect_aggregations(mappings, rows, parent_key if key == 'buckets' else key, value, row, result_columns, result_columns_index)
if val:
row = get_row(rows, row)
collect_value(mappings, row, key, val, 'long')

for data_key in ['value', 'doc_count']:
if data_key not in data:
continue
if 'key' in data and len(data.keys()) == 2:
key_is_string = 'key_as_string' in data
collect_value(mappings, row, data['key'] if not key_is_string else data['key_as_string'], data[data_key], 'long' if not key_is_string else 'string')
else:
return data[data_key]
if "buckets" in data:
row = collect_aggregations(mappings, rows, parent_key, data['buckets'], row, result_columns,
result_columns_index)
else:
for data_key in ("value", "doc_count", "key_as_string", "key"):
if data.get(data_key, []) != []:
if data_key == "key_as_string":
collect_value(mappings, row, parent_key, data[data_key], "string")
del data['key']
elif data_key == "key":
collect_value(mappings, row, parent_key, data[data_key], "string")
elif data_key == "doc_count":
collect_value(mappings, row, parent_key + ".doc_count", data[data_key], "integer")
else:
_type = "integer" if isinstance(data[data_key], int) else "float"
collect_value(mappings, row, parent_key, data[data_key], _type)
del data[data_key]
for key, value in data.iteritems():
row = collect_aggregations(mappings, rows, parent_key + "." + key, value, row, result_columns,
result_columns_index)

elif isinstance(data, list):
if not data:
row = clean_value(row, parent_key)

for value in data:
result_row = get_row(rows, row)
collect_aggregations(mappings, rows, parent_key, value, result_row, result_columns, result_columns_index)
if 'doc_count' in value:
collect_value(mappings, result_row, 'doc_count', value['doc_count'], 'integer')
if 'key' in value:
if 'key_as_string' in value:
collect_value(mappings, result_row, parent_key, value['key_as_string'], 'string')
else:
collect_value(mappings, result_row, parent_key, value['key'], 'string')

return None
row = collect_aggregations(mappings, rows, parent_key, value, row, result_columns,
result_columns_index)
collect_row(rows, row)

return row

result_columns_index = {c["name"]: c for c in result_columns}

Expand All @@ -247,7 +256,8 @@ def collect_aggregations(mappings, rows, parent_key, data, row, result_columns,
add_column_if_needed(mappings, field, field, result_columns, result_columns_index)

for key, data in raw_result["aggregations"].iteritems():
collect_aggregations(mappings, result_rows, key, data, None, result_columns, result_columns_index)
row = collect_aggregations(mappings, result_rows, key, data, {}, result_columns, result_columns_index)
collect_row(result_rows, row)

logger.debug("result_rows %s", str(result_rows))
logger.debug("result_columns %s", str(result_columns))
Expand Down
Loading

0 comments on commit 09c6f39

Please sign in to comment.