diff --git a/contrib/connectors/pandas/models.py b/contrib/connectors/pandas/models.py index b89eff0b04349..8bab58c6949e6 100644 --- a/contrib/connectors/pandas/models.py +++ b/contrib/connectors/pandas/models.py @@ -59,6 +59,7 @@ class PandasColumn(Model, BaseColumn): 'PandasDatasource', backref=backref('columns', cascade='all, delete-orphan'), foreign_keys=[pandas_datasource_id]) + expression = Column(Text) @property def is_num(self): @@ -83,10 +84,6 @@ def is_string(self): date_types = ('DATE', 'TIME', 'DATETIME') str_types = ('VARCHAR', 'STRING', 'CHAR') - @property - def expression(self): - return '' - @property def data(self): attrs = ( @@ -288,6 +285,13 @@ def get_empty_dataframe(self): return pd.DataFrame({k: pd.Series(dtype=t) for k, t in columns}) def get_dataframe(self): + """ + Read the source_url and return a Pandas DataFrame. + + Use the PandasColumns to coerce columns into the correct dtype, + and add any calculated columns to the DataFrame. + """ + calculated_columns = [] if self.df is None: self.df = self.pandas_read_method(self.source_url, **self.pandas_read_parameters) @@ -298,7 +302,12 @@ def get_dataframe(self): for col in self.columns: name = col.column_name type = col.type - if type != self.df[name].dtype.name: + # Prepare calculated columns + if col.expression: + calculated_columns.append('{name} = {expr}'.format( + name=name, expr=col.expression)) + elif type != self.df[name].dtype.name: + # Convert column to correct dtype try: self.df[name] = self.df[name].values.astype(type) except ValueError as e: @@ -309,6 +318,12 @@ def get_dataframe(self): new_type=type) e.args = (message,) + e.args raise + # Add the calcuated columns, using a multi-line string to add them all at once + # See https://pandas.pydata.org/pandas-docs/stable/enhancingperf.html#enhancingperf-eval + if calculated_columns: + self.df.eval('\n'.join(calculated_columns), + truediv=True, + inplace=True) return self.df def get_filter_query(self, filter): @@ -347,6 +362,24 @@ def get_filter_query(self, filter): query += "({col} {op} {eq})".format(col=col, op=op, eq=eq) return query + def get_agg_function(self, expr): + """ + Return a function that can be passed to DataFrame.apply(). + + Complex expressions that work on multiple columns must be a function + that accepts a Group as the parameter. + + The function can be defined on the Connector, or on the DataFrame, + in the local scope + """ + if expr in ['sum', 'mean', 'std', 'sem', 'count']: + return expr + if hasattr(self, expr): + return getattr(self, expr) + if hasattr(self.get_dataframe(), expr): + return getattr(self.get_dataframe(), expr) + return locals()[expr] + def process_dataframe( self, df, @@ -426,14 +459,14 @@ def process_dataframe( assert isinstance(metric.source, basestring) aggregates = {metric.source: metric.expression} df = (df[df.set_index(groupby).index.isin( - df.groupby(groupby) + df.groupby(groupby, sort=False) .aggregate(aggregates) .sort_values(metric.source, ascending=metric_order_asc) .iloc[:timeseries_limit].index)]) query_str += ('[df.set_index({groupby}).index.isin(' - 'df.groupby({groupby})' + 'df.groupby({groupby}, sort=False)' '.aggregate({aggregates})' ".sort_values('{metric.source}', " 'ascending={metric_order_asc})' @@ -474,38 +507,97 @@ def process_dataframe( # Single-column aggregates can be calculated using aggregate, # multi-column ones need to use apply. - # aggregates is a dict keyed by a column name, or a tuple - # of column names, where the value is a list of expressions - # that can be used by DataFrame.aggregate() on those columns + # aggregates is a dict keyed by a column name, where the value is + # a list of expressions that can be used by DataFrame.aggregate() + # apply_functions is a dict keyed by the metric name, where the + # value is a function that can be passed to DataFrame.apply() aggregates = OrderedDict() - for metric in metrics_exprs.values(): - assert isinstance(metric.source, basestring) - if metric.source in aggregates: - aggregates[metric.source].append(metric.expression) + agg_names = [] + apply_functions = [] + apply_names = [] + for metric_name, metric in metrics_exprs.items(): + sources = [] + if metric.source: + sources = [s.strip() for s in metric.source.split(',') if s] + if len(sources) == 1: + # Single column source, so use aggregate + func = self.get_agg_function(metric.expression) + if metric.source in aggregates: + aggregates[metric.source].append(func) + else: + aggregates[metric.source] = [func] + agg_names.append(metric_name) else: - aggregates[metric.source] = [metric.expression] + # Multiple columns so the expression must be a function + # that accepts a Group as the parameter + apply_functions.append((sources, + metric.expression, + self.get_agg_function(metric.expression))) + apply_names.append(metric_name) + + # Build a list of like-indexed DataFrames containing the results + # of DataFrame.aggregate() and individual DataFrame.apply() calls + dfs = [] + query_strs = [] if groupby or timestamp_exprs: - df = (df.groupby(groupby + timestamp_exprs) - .aggregate(aggregates) - .reset_index()) - query_str += ('.groupby({groupby})' - '.aggregate({aggregates})' - '.reset_index()').format( - groupby=groupby, - aggregates=aggregates) + df = df.groupby(groupby + timestamp_exprs, sort=False) + query_str += '.groupby({}, sort=False)'.format(groupby + timestamp_exprs) + + for sources, expr, func in apply_functions: + if sources: + dfs.append(df[sources].apply(func)) + query_strs.append(query_str + + '[{}].apply({})'.format(sources, expr)) + else: + dfs.append(df.apply(func)) + query_strs.append(query_str + '.apply({})'.format(expr)) + else: + # Multi-column aggregates need to be passed the DataFrame, + # whereas if we call DataFrame.apply() without a groupby + # the func is called on each column individually + for sources, expr, func in apply_functions: + if sources: + dfs.append(pd.Series(func(df[sources]))) + query_strs.append('pd.Series({expr}({df}[{sources}]))'.format( + expr=expr, + df=query_str, + sources=sources)) + else: + dfs.append(pd.Series(func(df))) + query_strs.append('pd.Series({expr}({df}))'.format( + expr=expr, + df=query_str)) + + if aggregates: + dfs.append(df.aggregate(aggregates)) + query_strs.append(query_str + + '.aggregate({})'.format(aggregates)) + + # If there is more than one DataFrame in the list then + # concatenate them along the index + if len(dfs) > 1: + df = pd.concat(dfs, axis=1) + query_str = 'pd.concat([{}])'.format(', '.join(query_strs)) + else: + df = dfs[0] + query_str = query_strs[0] + if groupby or timestamp_exprs: + df = df.reset_index() + query_str += '.reset_index()' else: - df = df.aggregate(aggregates) - query_str += '.aggregate({aggregates})'.format( - aggregates=aggregates) # Note that Superset expects a DataFrame with single Row and # the metrics as columns, rather than with the metrics - # as the index - df = df.T.reset_index(drop=True) - query_str += '.T.reset_index(drop=True)' + # as the index, so if we have a summary then we need to + # reindex it + df = df.bfill(axis=1).T.reset_index(drop=True).head(n=1) + query_str += '.bfill(axis=1).T.reset_index(drop=True).head(n=1)' - df.columns = groupby + timestamp_cols + metrics + filtered_metrics + # Set the correct columns names and then reorder the columns + # to match the requested order + df.columns = groupby + timestamp_cols + apply_names + agg_names + df = df[groupby + timestamp_cols + metrics + filtered_metrics] # Filtering of rows post-aggregation based on metrics # Note that the front end uses `having_druid` as the parameter @@ -548,12 +640,13 @@ def process_dataframe( # Use the group by as a tie breaker orderby = orderby + groupby ascending = ascending + ([True] * len(groupby)) - df = (df.groupby(groupby) + df = (df.groupby(groupby, sort=False) .size() .reset_index() .sort_values(orderby, ascending=ascending) .drop(0, axis=1)) - query_str += ('.groupby({groupby}).size().reset_index()' + query_str += ('.groupby({groupby}, sort=False).size()' + '.reset_index()' '.sort_values({orderby}, ascending={ascending})' '.drop(0, axis=1)').format( groupby=groupby, diff --git a/contrib/tests/connector_tests.py b/contrib/tests/connector_tests.py index 2722bebb5cc21..632bb829ea681 100644 --- a/contrib/tests/connector_tests.py +++ b/contrib/tests/connector_tests.py @@ -47,22 +47,22 @@ def setUpClass(cls): super(BaseConnectorTestCase, cls).setUpClass() data = """ - | region | district | project | received | value | - |----------|------------|-----------|---------------------|--------| - | Region 1 | District A | Project A | 2001-01-31 10:00:00 | 33 | - | Region 1 | District A | Project A | 2001-01-31 12:00:00 | 32 | - | Region 1 | District B | Project B | 2001-01-31 13:00:00 | 35 | - | Region 2 | District C | Project C | 2001-01-31 09:00:00 | 12 | - | Region 1 | District A | Project A | 2001-02-28 09:00:00 | 66 | - | Region 1 | District B | Project B | 2001-02-28 08:00:00 | 15 | - | Region 1 | District B | Project B | 2001-02-28 10:00:00 | 25 | - | Region 2 | District C | Project C | 2001-02-28 08:00:00 | 18 | - | Region 1 | District A | Project A | 2001-03-31 11:00:00 | 85 | - | Region 1 | District B | Project B | 2001-03-31 12:00:00 | 5 | - | Region 2 | District C | Project C | 2001-03-31 14:00:00 | 35 | - | Region 1 | District A | Project A | 2001-04-30 10:00:00 | 15 | - | Region 1 | District A | Project A | 2001-04-30 12:00:00 | 15 | - | Region 2 | District C | Project C | 2001-04-30 13:00:00 | 15 | + | region | district | project | received | value | value2 | category | + |----------|------------|-----------|---------------------|--------|--------|-----------| + | Region 1 | District A | Project A | 2001-01-31 10:00:00 | 33 | 12.30 | CategoryA | + | Region 1 | District A | Project A | 2001-01-31 12:00:00 | 32 | 13.60 | CategoryB | + | Region 1 | District B | Project B | 2001-01-31 13:00:00 | 35 | 15.50 | CategoryA | + | Region 2 | District C | Project C | 2001-01-31 09:00:00 | 12 | 17.80 | CategoryB | + | Region 1 | District A | Project A | 2001-02-28 09:00:00 | 66 | 11.30 | CategoryB | + | Region 1 | District B | Project B | 2001-02-28 08:00:00 | 15 | 19.90 | CategoryB | + | Region 1 | District B | Project B | 2001-02-28 10:00:00 | 25 | 15.30 | CategoryA | + | Region 2 | District C | Project C | 2001-02-28 08:00:00 | 18 | 13.80 | CategoryA | + | Region 1 | District A | Project A | 2001-03-31 11:00:00 | 85 | 45.10 | CategoryB | + | Region 1 | District B | Project B | 2001-03-31 12:00:00 | 5 | 28.10 | CategoryA | + | Region 2 | District C | Project C | 2001-03-31 14:00:00 | 35 | 22.60 | CategoryB | + | Region 1 | District A | Project A | 2001-04-30 10:00:00 | 15 | 11.00 | CategoryA | + | Region 1 | District A | Project A | 2001-04-30 12:00:00 | 15 | 16.10 | CategoryB | + | Region 2 | District C | Project C | 2001-04-30 13:00:00 | 15 | 18.50 | CategoryA | """ def assertFrameEqual(self, frame1, frame2, msg=None): @@ -162,7 +162,7 @@ def test_summary_single_metric(self): def test_summary_multiple_metrics(self): parameters = { 'groupby': [], - 'metrics': ['sum__value', 'avg__value'], + 'metrics': ['sum__value', 'avg__value', 'value_percentage'], 'granularity': 'received', 'from_dttm': datetime.datetime(2001, 1, 1), 'to_dttm': datetime.datetime(2001, 12, 31), @@ -182,6 +182,8 @@ def test_summary_multiple_metrics(self): expected_df = pd.DataFrame(OrderedDict([ ('sum__value', [self.df['value'].sum()]), ('avg__value', [self.df['value'].mean()]), + ('value_percentage', [sum(self.df['value']) / + sum(self.df['value'] + self.df['value2'])]), ])) self.assertEqual(result.df, expected_df) @@ -603,6 +605,90 @@ def test_groupby_multiple_metrics(self): expected_df = expected_df.sort_values(['sum__value'], ascending=False) self.assertEqual(result.df, expected_df) + def test_groupby_ratio_metric(self): + parameters = { + 'groupby': ['project', 'region'], + 'metrics': ['ratio'], + 'granularity': 'received', + 'from_dttm': datetime.datetime(2001, 1, 1), + 'to_dttm': datetime.datetime(2001, 12, 31), + 'filter': [], + 'is_timeseries': False, + 'timeseries_limit': 0, + 'timeseries_limit_metric': None, + 'row_limit': 5000, + 'extras': { + 'time_grain_sqla': None, + }, + } + self.df['ratio'] = self.df['value'] / self.df['value2'] + result = self.datasource.query(parameters) + self.assertIsInstance(result, QueryResult) + self.assertEqual(result.error_message, None) + self.assertEqual(result.status, QueryStatus.SUCCESS) + expected_df = (self.df.groupby(parameters['groupby'])['ratio'] + .mean() + .reset_index() + .sort_values(['ratio'], ascending=False)) + expected_df.columns = parameters['groupby'] + parameters['metrics'] + self.assertEqual(result.df, expected_df) + + def test_groupby_value_percentage_metric(self): + parameters = { + 'groupby': ['project', 'region'], + 'metrics': ['value_percentage'], + 'granularity': 'received', + 'from_dttm': datetime.datetime(2001, 1, 1), + 'to_dttm': datetime.datetime(2001, 12, 31), + 'filter': [], + 'is_timeseries': False, + 'timeseries_limit': 0, + 'timeseries_limit_metric': None, + 'row_limit': 5000, + 'extras': { + 'time_grain_sqla': None, + }, + } + result = self.datasource.query(parameters) + self.assertIsInstance(result, QueryResult) + self.assertEqual(result.error_message, None) + self.assertEqual(result.status, QueryStatus.SUCCESS) + expected_df = (self.df.groupby(parameters['groupby']) + .apply(lambda x: sum(x['value'])/sum(x['value'] + x['value2'])) + .reset_index() + .sort_values([0], ascending=False)) + expected_df.columns = parameters['groupby'] + parameters['metrics'] + self.assertEqual(result.df, expected_df) + + def test_groupby_category_percentage_metric(self): + parameters = { + 'groupby': ['project', 'region'], + 'metrics': ['category_percentage'], + 'granularity': 'received', + 'from_dttm': datetime.datetime(2001, 1, 1), + 'to_dttm': datetime.datetime(2001, 12, 31), + 'filter': [], + 'is_timeseries': False, + 'timeseries_limit': 0, + 'timeseries_limit_metric': None, + 'row_limit': 5000, + 'extras': { + 'time_grain_sqla': None, + }, + } + result = self.datasource.query(parameters) + self.assertIsInstance(result, QueryResult) + self.assertEqual(result.error_message, None) + self.assertEqual(result.status, QueryStatus.SUCCESS) + expected_df = (self.df.groupby(parameters['groupby'])['category'] + .value_counts(normalize=True) + .reset_index(parameters['groupby']) + .loc['CategoryA'] + .reset_index(drop=True) + .sort_values(['category'], ascending=False)) + expected_df.columns = parameters['groupby'] + parameters['metrics'] + self.assertEqual(result.df, expected_df) + def test_groupby_ascending_order(self): parameters = { 'groupby': ['project', 'region'], @@ -844,6 +930,12 @@ class SqlaConnectorTestCase(BaseConnectorTestCase): expression='SUM(value)'), SqlMetric(metric_name='avg__value', metric_type='avg', expression='AVG(value)'), + SqlMetric(metric_name='ratio', metric_type='avg', + expression='AVG(value/value2)'), + SqlMetric(metric_name='value_percentage', metric_type='custom', + expression="SUM(value)/SUM(value + value2)"), + SqlMetric(metric_name='category_percentage', metric_type='custom', + expression="SUM(CASE WHEN category='CategoryA' THEN 1 ELSE 0 END)/CAST(COUNT(*) AS REAL)"), ] def setUp(self): @@ -873,6 +965,10 @@ class PandasConnectorTestCase(BaseConnectorTestCase): PandasColumn(column_name='project', type='object'), PandasColumn(column_name='received', type='datetime64[D]'), PandasColumn(column_name='value', type='int64'), + PandasColumn(column_name='ratio', type='float64', + expression="value / value2"), + PandasColumn(column_name='inverse_ratio', type='float64', + expression="value2 / value"), ] metrics = [ @@ -880,6 +976,12 @@ class PandasConnectorTestCase(BaseConnectorTestCase): source='value', expression='sum'), PandasMetric(metric_name='avg__value', metric_type='avg', source='value', expression='mean'), + PandasMetric(metric_name='ratio', metric_type='avg', + source='ratio', expression='mean'), + PandasMetric(metric_name='value_percentage', metric_type='custom', + source=None, expression='calc_value_percentage'), + PandasMetric(metric_name='category_percentage', metric_type='custom', + source='category', expression="calc_category_percentage"), ] def setUp(self): @@ -890,6 +992,16 @@ def setUp(self): columns=self.columns, metrics=self.metrics) + def calc_value_percentage(group): + return sum(group['value'])/sum(group['value'] + group['value2']) + + self.datasource.calc_value_percentage = calc_value_percentage + + def calc_category_percentage(group): + return group.value_counts(normalize=True).loc['CategoryA'] + + self.datasource.calc_category_percentage = calc_category_percentage + def test_post_aggregation_filter(self): parameters = { 'groupby': ['project', 'region'], diff --git a/superset/migrations/versions/b2cd059e8803_add_pandasdatasource.py b/superset/migrations/versions/b2cd059e8803_add_pandasdatasource.py index 43be7064f66cb..5f45bbe6fc887 100644 --- a/superset/migrations/versions/b2cd059e8803_add_pandasdatasource.py +++ b/superset/migrations/versions/b2cd059e8803_add_pandasdatasource.py @@ -64,6 +64,7 @@ def upgrade(): sa.Column('min', sa.Boolean(), nullable=True), sa.Column('filterable', sa.Boolean(), nullable=True), sa.Column('description', sa.Text(), nullable=True), + sa.Column('expression', sa.Text(), nullable=True), sa.Column('id', sa.Integer(), nullable=False), sa.Column('pandas_datasource_id', sa.Integer(), nullable=True), sa.Column('created_by_fk', sa.Integer(), nullable=True),