Skip to content

Commit

Permalink
Allow calculated PandasColumns and multi-column PandasMetrics - see a…
Browse files Browse the repository at this point in the history
  • Loading branch information
rhunwicks committed Oct 2, 2017
1 parent c69a161 commit fbfd685
Show file tree
Hide file tree
Showing 3 changed files with 255 additions and 49 deletions.
157 changes: 125 additions & 32 deletions contrib/connectors/pandas/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ class PandasColumn(Model, BaseColumn):
'PandasDatasource',
backref=backref('columns', cascade='all, delete-orphan'),
foreign_keys=[pandas_datasource_id])
expression = Column(Text)

@property
def is_num(self):
Expand All @@ -83,10 +84,6 @@ def is_string(self):
date_types = ('DATE', 'TIME', 'DATETIME')
str_types = ('VARCHAR', 'STRING', 'CHAR')

@property
def expression(self):
return ''

@property
def data(self):
attrs = (
Expand Down Expand Up @@ -288,6 +285,13 @@ def get_empty_dataframe(self):
return pd.DataFrame({k: pd.Series(dtype=t) for k, t in columns})

def get_dataframe(self):
"""
Read the source_url and return a Pandas DataFrame.
Use the PandasColumns to coerce columns into the correct dtype,
and add any calculated columns to the DataFrame.
"""
calculated_columns = []
if self.df is None:
self.df = self.pandas_read_method(self.source_url,
**self.pandas_read_parameters)
Expand All @@ -298,7 +302,12 @@ def get_dataframe(self):
for col in self.columns:
name = col.column_name
type = col.type
if type != self.df[name].dtype.name:
# Prepare calculated columns
if col.expression:
calculated_columns.append('{name} = {expr}'.format(
name=name, expr=col.expression))
elif type != self.df[name].dtype.name:
# Convert column to correct dtype
try:
self.df[name] = self.df[name].values.astype(type)
except ValueError as e:
Expand All @@ -309,6 +318,12 @@ def get_dataframe(self):
new_type=type)
e.args = (message,) + e.args
raise
# Add the calcuated columns, using a multi-line string to add them all at once
# See https://pandas.pydata.org/pandas-docs/stable/enhancingperf.html#enhancingperf-eval
if calculated_columns:
self.df.eval('\n'.join(calculated_columns),
truediv=True,
inplace=True)
return self.df

def get_filter_query(self, filter):
Expand Down Expand Up @@ -347,6 +362,24 @@ def get_filter_query(self, filter):
query += "({col} {op} {eq})".format(col=col, op=op, eq=eq)
return query

def get_agg_function(self, expr):
"""
Return a function that can be passed to DataFrame.apply().
Complex expressions that work on multiple columns must be a function
that accepts a Group as the parameter.
The function can be defined on the Connector, or on the DataFrame,
in the local scope
"""
if expr in ['sum', 'mean', 'std', 'sem', 'count']:
return expr
if hasattr(self, expr):
return getattr(self, expr)
if hasattr(self.get_dataframe(), expr):
return getattr(self.get_dataframe(), expr)
return locals()[expr]

def process_dataframe(
self,
df,
Expand Down Expand Up @@ -426,14 +459,14 @@ def process_dataframe(
assert isinstance(metric.source, basestring)
aggregates = {metric.source: metric.expression}
df = (df[df.set_index(groupby).index.isin(
df.groupby(groupby)
df.groupby(groupby, sort=False)
.aggregate(aggregates)
.sort_values(metric.source,
ascending=metric_order_asc)
.iloc[:timeseries_limit].index)])

query_str += ('[df.set_index({groupby}).index.isin('
'df.groupby({groupby})'
'df.groupby({groupby}, sort=False)'
'.aggregate({aggregates})'
".sort_values('{metric.source}', "
'ascending={metric_order_asc})'
Expand Down Expand Up @@ -474,38 +507,97 @@ def process_dataframe(

# Single-column aggregates can be calculated using aggregate,
# multi-column ones need to use apply.
# aggregates is a dict keyed by a column name, or a tuple
# of column names, where the value is a list of expressions
# that can be used by DataFrame.aggregate() on those columns
# aggregates is a dict keyed by a column name, where the value is
# a list of expressions that can be used by DataFrame.aggregate()
# apply_functions is a dict keyed by the metric name, where the
# value is a function that can be passed to DataFrame.apply()
aggregates = OrderedDict()
for metric in metrics_exprs.values():
assert isinstance(metric.source, basestring)
if metric.source in aggregates:
aggregates[metric.source].append(metric.expression)
agg_names = []
apply_functions = []
apply_names = []
for metric_name, metric in metrics_exprs.items():
sources = []
if metric.source:
sources = [s.strip() for s in metric.source.split(',') if s]
if len(sources) == 1:
# Single column source, so use aggregate
func = self.get_agg_function(metric.expression)
if metric.source in aggregates:
aggregates[metric.source].append(func)
else:
aggregates[metric.source] = [func]
agg_names.append(metric_name)
else:
aggregates[metric.source] = [metric.expression]
# Multiple columns so the expression must be a function
# that accepts a Group as the parameter
apply_functions.append((sources,
metric.expression,
self.get_agg_function(metric.expression)))
apply_names.append(metric_name)

# Build a list of like-indexed DataFrames containing the results
# of DataFrame.aggregate() and individual DataFrame.apply() calls
dfs = []
query_strs = []

if groupby or timestamp_exprs:
df = (df.groupby(groupby + timestamp_exprs)
.aggregate(aggregates)
.reset_index())
query_str += ('.groupby({groupby})'
'.aggregate({aggregates})'
'.reset_index()').format(
groupby=groupby,
aggregates=aggregates)
df = df.groupby(groupby + timestamp_exprs, sort=False)
query_str += '.groupby({}, sort=False)'.format(groupby + timestamp_exprs)

for sources, expr, func in apply_functions:
if sources:
dfs.append(df[sources].apply(func))
query_strs.append(query_str +
'[{}].apply({})'.format(sources, expr))
else:
dfs.append(df.apply(func))
query_strs.append(query_str + '.apply({})'.format(expr))
else:
# Multi-column aggregates need to be passed the DataFrame,
# whereas if we call DataFrame.apply() without a groupby
# the func is called on each column individually
for sources, expr, func in apply_functions:
if sources:
dfs.append(pd.Series(func(df[sources])))
query_strs.append('pd.Series({expr}({df}[{sources}]))'.format(
expr=expr,
df=query_str,
sources=sources))
else:
dfs.append(pd.Series(func(df)))
query_strs.append('pd.Series({expr}({df}))'.format(
expr=expr,
df=query_str))

if aggregates:
dfs.append(df.aggregate(aggregates))
query_strs.append(query_str +
'.aggregate({})'.format(aggregates))

# If there is more than one DataFrame in the list then
# concatenate them along the index
if len(dfs) > 1:
df = pd.concat(dfs, axis=1)
query_str = 'pd.concat([{}])'.format(', '.join(query_strs))
else:
df = dfs[0]
query_str = query_strs[0]

if groupby or timestamp_exprs:
df = df.reset_index()
query_str += '.reset_index()'
else:
df = df.aggregate(aggregates)
query_str += '.aggregate({aggregates})'.format(
aggregates=aggregates)
# Note that Superset expects a DataFrame with single Row and
# the metrics as columns, rather than with the metrics
# as the index
df = df.T.reset_index(drop=True)
query_str += '.T.reset_index(drop=True)'
# as the index, so if we have a summary then we need to
# reindex it
df = df.bfill(axis=1).T.reset_index(drop=True).head(n=1)
query_str += '.bfill(axis=1).T.reset_index(drop=True).head(n=1)'

df.columns = groupby + timestamp_cols + metrics + filtered_metrics
# Set the correct columns names and then reorder the columns
# to match the requested order
df.columns = groupby + timestamp_cols + apply_names + agg_names
df = df[groupby + timestamp_cols + metrics + filtered_metrics]

# Filtering of rows post-aggregation based on metrics
# Note that the front end uses `having_druid` as the parameter
Expand Down Expand Up @@ -548,12 +640,13 @@ def process_dataframe(
# Use the group by as a tie breaker
orderby = orderby + groupby
ascending = ascending + ([True] * len(groupby))
df = (df.groupby(groupby)
df = (df.groupby(groupby, sort=False)
.size()
.reset_index()
.sort_values(orderby, ascending=ascending)
.drop(0, axis=1))
query_str += ('.groupby({groupby}).size().reset_index()'
query_str += ('.groupby({groupby}, sort=False).size()'
'.reset_index()'
'.sort_values({orderby}, ascending={ascending})'
'.drop(0, axis=1)').format(
groupby=groupby,
Expand Down
Loading

0 comments on commit fbfd685

Please sign in to comment.