diff --git a/contrib/connectors/pandas/models.py b/contrib/connectors/pandas/models.py index 28488c1a57c02..8186e67d36940 100644 --- a/contrib/connectors/pandas/models.py +++ b/contrib/connectors/pandas/models.py @@ -324,7 +324,8 @@ def get_dataframe(self): cache_key = self.cache_key self.df = dataframe_cache.get(cache_key) if not isinstance(self.df, pd.DataFrame): - if isinstance(self.source_url, basestring) and self.source_url[:4] == 'http': + if (isinstance(self.source_url, basestring) and + self.source_url[:4] == 'http'): # Use requests to retrieve remote data so we can handle authentication auth = self.source_auth url = self.source_url @@ -377,7 +378,7 @@ def get_dataframe(self): e.args = (message,) + e.args raise # Add the calcuated columns, using a multi-line string to add them all at once - # See https://pandas.pydata.org/pandas-docs/stable/enhancingperf.html#enhancingperf-eval + # See https://pandas.pydata.org/pandas-docs/stable/enhancingperf.html#enhancingperf-eval # NOQA: E501 if calculated_columns: self.df.eval('\n'.join(calculated_columns), truediv=True, @@ -836,7 +837,7 @@ def reconcile_column_metrics(mapper, connection, target): Create or delete PandasMetrics to match the metric attributes specified on a PandasColumn """ - metrics_table = PandasMetric.__table__ + mtable = PandasMetric.__table__ for metric_type in ('sum', 'avg', 'max', 'min', 'count_distinct'): # Set up the metric attributes metric_name = metric_type + '__' + target.column_name @@ -852,15 +853,15 @@ def reconcile_column_metrics(mapper, connection, target): if getattr(target, metric_type): # Create the metric if it doesn't already exist result = connection.execute( - metrics_table + mtable .select() .where( and_( - metrics_table.c.pandas_datasource_id == target.pandas_datasource_id, - metrics_table.c.metric_name == metric_name))) + mtable.c.pandas_datasource_id == target.pandas_datasource_id, + mtable.c.metric_name == metric_name))) if not result.rowcount: connection.execute( - metrics_table.insert(), + mtable.insert(), pandas_datasource_id=target.pandas_datasource_id, metric_name=metric_name, verbose_name=verbose_name, @@ -869,15 +870,15 @@ def reconcile_column_metrics(mapper, connection, target): else: # Delete the metric if it exists and hasn't been customized connection.execute( - metrics_table + mtable .delete() .where( and_( - metrics_table.c.pandas_datasource_id == target.pandas_datasource_id, - metrics_table.c.metric_name == metric_name, - metrics_table.c.verbose_name == verbose_name, - metrics_table.c.source == source, - metrics_table.c.expression == expression))) + mtable.c.pandas_datasource_id == target.pandas_datasource_id, + mtable.c.metric_name == metric_name, + mtable.c.verbose_name == verbose_name, + mtable.c.source == source, + mtable.c.expression == expression))) def reconcile_metric_column(mapper, connection, target): @@ -885,18 +886,18 @@ def reconcile_metric_column(mapper, connection, target): Clear the metric attribute on a PandasColumn if the corresponding PandasMetric is deleted """ - column_table = PandasColumn.__table__ + ctable = PandasColumn.__table__ try: metric_type, column_name = target.metric_name.split('__', 1) - if metric_type in column_table.c: + if metric_type in ctable.c: connection.execute( - column_table + ctable .update() .values(**{metric_type: False}) .where( and_( - column_table.c.pandas_datasource_id == target.pandas_datasource_id, - column_table.c.column_name == column_name))) + ctable.c.pandas_datasource_id == target.pandas_datasource_id, + ctable.c.column_name == column_name))) except ValueError: # Metric name doesn't contain __ pass diff --git a/contrib/tests/connector_tests.py b/contrib/tests/connector_tests.py index f9e8925c65f36..682e1ef0057a7 100644 --- a/contrib/tests/connector_tests.py +++ b/contrib/tests/connector_tests.py @@ -62,7 +62,7 @@ def setUpClass(cls): | Region 1 | District A | Project A | 2001-04-30 10:00:00 | 15 | 11.00 | CategoryA | | Region 1 | District A | Project A | 2001-04-30 12:00:00 | 15 | 16.10 | CategoryB | | Region 2 | District C | Project C | 2001-04-30 13:00:00 | 15 | 18.50 | CategoryA | - """ + """ # NOQA: E501 def assertFrameEqual(self, frame1, frame2, msg=None): # We don't care about the index, because it is @@ -116,7 +116,7 @@ def test_values_for_column_with_limit(self): self.assertEqual(result, self.df['project'].unique()[:1].tolist()) def test_get_query_str(self): - parameters = { + p = { 'groupby': ['project'], 'metrics': ['sum__value'], 'granularity': 'received', @@ -131,11 +131,11 @@ def test_get_query_str(self): 'time_grain_sqla': None, }, } - result = self.datasource.get_query_str(parameters) + result = self.datasource.get_query_str(p) self.assertIn('project', result) def test_summary_single_metric(self): - parameters = { + p = { 'groupby': [], 'metrics': ['sum__value'], 'granularity': 'received', @@ -150,7 +150,7 @@ def test_summary_single_metric(self): 'time_grain_sqla': None, }, } - result = self.datasource.query(parameters) + result = self.datasource.query(p) self.assertIsInstance(result, QueryResult) self.assertEqual(result.error_message, None) self.assertEqual(result.status, QueryStatus.SUCCESS) @@ -158,7 +158,7 @@ def test_summary_single_metric(self): self.assertEqual(result.df, expected_df) def test_summary_multiple_metrics(self): - parameters = { + p = { 'groupby': [], 'metrics': ['sum__value', 'avg__value', 'value_percentage', 'ratio'], 'granularity': 'received', @@ -174,7 +174,7 @@ def test_summary_multiple_metrics(self): }, } self.df['ratio'] = self.df['value'] / self.df['value2'] - result = self.datasource.query(parameters) + result = self.datasource.query(p) self.assertIsInstance(result, QueryResult) self.assertEqual(result.error_message, None) self.assertEqual(result.status, QueryStatus.SUCCESS) @@ -188,7 +188,7 @@ def test_summary_multiple_metrics(self): self.assertEqual(result.df, expected_df) def test_from_to_dttm(self): - parameters = { + p = { 'groupby': [], 'metrics': ['sum__value'], 'granularity': 'received', @@ -203,7 +203,7 @@ def test_from_to_dttm(self): 'time_grain_sqla': None, }, } - result = self.datasource.query(parameters) + result = self.datasource.query(p) self.assertIsInstance(result, QueryResult) self.assertEqual(result.error_message, None) self.assertEqual(result.status, QueryStatus.SUCCESS) @@ -212,7 +212,7 @@ def test_from_to_dttm(self): self.assertEqual(result.df, expected_df) def test_filter_eq_string(self): - parameters = { + p = { 'groupby': ['project', 'region'], 'metrics': ['sum__value'], 'granularity': 'received', @@ -229,21 +229,21 @@ def test_filter_eq_string(self): 'time_grain_sqla': None, }, } - result = self.datasource.query(parameters) + result = self.datasource.query(p) self.assertIsInstance(result, QueryResult) self.assertEqual(result.error_message, None) self.assertEqual(result.status, QueryStatus.SUCCESS) expected_df = (self.df .loc[self.df['district'] == 'District A'] - .groupby(parameters['groupby']) + .groupby(p['groupby']) .aggregate({'value': ['sum']}) .reset_index()) - expected_df.columns = parameters['groupby'] + parameters['metrics'] + expected_df.columns = p['groupby'] + p['metrics'] expected_df = expected_df.sort_values(['sum__value'], ascending=False) self.assertEqual(result.df, expected_df) def test_filter_eq_num(self): - parameters = { + p = { 'groupby': ['project', 'region'], 'metrics': ['sum__value'], 'granularity': 'received', @@ -260,21 +260,21 @@ def test_filter_eq_num(self): 'time_grain_sqla': None, }, } - result = self.datasource.query(parameters) + result = self.datasource.query(p) self.assertIsInstance(result, QueryResult) self.assertEqual(result.error_message, None) self.assertEqual(result.status, QueryStatus.SUCCESS) expected_df = (self.df .loc[self.df['value'] == 85] - .groupby(parameters['groupby']) + .groupby(p['groupby']) .aggregate({'value': ['sum']}) .reset_index()) - expected_df.columns = parameters['groupby'] + parameters['metrics'] + expected_df.columns = p['groupby'] + p['metrics'] expected_df = expected_df.sort_values(['sum__value'], ascending=False) self.assertEqual(result.df, expected_df) def test_filter_eq_date(self): - parameters = { + p = { 'groupby': ['project', 'region'], 'metrics': ['sum__value'], 'granularity': 'received', @@ -291,21 +291,21 @@ def test_filter_eq_date(self): 'time_grain_sqla': None, }, } - result = self.datasource.query(parameters) + result = self.datasource.query(p) self.assertIsInstance(result, QueryResult) self.assertEqual(result.error_message, None) self.assertEqual(result.status, QueryStatus.SUCCESS) expected_df = (self.df .loc[self.df['received'] == datetime.datetime(2001, 2, 28)] - .groupby(parameters['groupby']) + .groupby(p['groupby']) .aggregate({'value': ['sum']}) .reset_index()) - expected_df.columns = parameters['groupby'] + parameters['metrics'] + expected_df.columns = p['groupby'] + p['metrics'] expected_df = expected_df.sort_values(['sum__value'], ascending=False) self.assertEqual(result.df, expected_df) def test_filter_gte(self): - parameters = { + p = { 'groupby': ['project', 'region'], 'metrics': ['sum__value'], 'granularity': 'received', @@ -322,21 +322,21 @@ def test_filter_gte(self): 'time_grain_sqla': None, }, } - result = self.datasource.query(parameters) + result = self.datasource.query(p) self.assertIsInstance(result, QueryResult) self.assertEqual(result.error_message, None) self.assertEqual(result.status, QueryStatus.SUCCESS) expected_df = (self.df .loc[self.df['value'] >= 70] - .groupby(parameters['groupby']) + .groupby(p['groupby']) .aggregate({'value': ['sum']}) .reset_index()) - expected_df.columns = parameters['groupby'] + parameters['metrics'] + expected_df.columns = p['groupby'] + p['metrics'] expected_df = expected_df.sort_values(['sum__value'], ascending=False) self.assertEqual(result.df, expected_df) def test_filter_in_num(self): - parameters = { + p = { 'groupby': ['project', 'region'], 'metrics': ['sum__value'], 'granularity': 'received', @@ -353,23 +353,23 @@ def test_filter_in_num(self): 'time_grain_sqla': None, }, } - result = self.datasource.query(parameters) + result = self.datasource.query(p) self.assertIsInstance(result, QueryResult) self.assertEqual(result.error_message, None) self.assertEqual(result.status, QueryStatus.SUCCESS) expected_df = (self.df .loc[~((self.df['value'] != 32) & (self.df['value'] != 35))] - .groupby(parameters['groupby']) + .groupby(p['groupby']) .aggregate({'value': ['sum']}) .reset_index()) - expected_df.columns = parameters['groupby'] + parameters['metrics'] + expected_df.columns = p['groupby'] + p['metrics'] expected_df = (expected_df.sort_values(['sum__value'], ascending=False) .reset_index(drop=True)) self.assertEqual(result.df, expected_df) def test_filter_in_str(self): - parameters = { + p = { 'groupby': ['project', 'region'], 'metrics': ['sum__value'], 'granularity': 'received', @@ -386,23 +386,23 @@ def test_filter_in_str(self): 'time_grain_sqla': None, }, } - result = self.datasource.query(parameters) + result = self.datasource.query(p) self.assertIsInstance(result, QueryResult) self.assertEqual(result.error_message, None) self.assertEqual(result.status, QueryStatus.SUCCESS) expected_df = (self.df .loc[~((self.df['project'] != 'Project A') & (self.df['project'] != 'Project C'))] - .groupby(parameters['groupby']) + .groupby(p['groupby']) .aggregate({'value': ['sum']}) .reset_index()) - expected_df.columns = parameters['groupby'] + parameters['metrics'] + expected_df.columns = p['groupby'] + p['metrics'] expected_df = (expected_df.sort_values(['sum__value'], ascending=False) .reset_index(drop=True)) self.assertEqual(result.df, expected_df) def test_filter_not_in_num(self): - parameters = { + p = { 'groupby': ['project', 'region'], 'metrics': ['sum__value'], 'granularity': 'received', @@ -419,23 +419,23 @@ def test_filter_not_in_num(self): 'time_grain_sqla': None, }, } - result = self.datasource.query(parameters) + result = self.datasource.query(p) self.assertIsInstance(result, QueryResult) self.assertEqual(result.error_message, None) self.assertEqual(result.status, QueryStatus.SUCCESS) expected_df = (self.df .loc[((self.df['value'] != 32) & (self.df['value'] != 35))] - .groupby(parameters['groupby']) + .groupby(p['groupby']) .aggregate({'value': ['sum']}) .reset_index()) - expected_df.columns = parameters['groupby'] + parameters['metrics'] + expected_df.columns = p['groupby'] + p['metrics'] expected_df = (expected_df.sort_values(['sum__value'], ascending=False) .reset_index(drop=True)) self.assertEqual(result.df, expected_df) def test_filter_not_in_str(self): - parameters = { + p = { 'groupby': ['project', 'region'], 'metrics': ['sum__value'], 'granularity': 'received', @@ -452,22 +452,22 @@ def test_filter_not_in_str(self): 'time_grain_sqla': None, }, } - result = self.datasource.query(parameters) + result = self.datasource.query(p) self.assertIsInstance(result, QueryResult) self.assertEqual(result.error_message, None) self.assertEqual(result.status, QueryStatus.SUCCESS) expected_df = (self.df .loc[((self.df['project'] != 'Project A') & (self.df['project'] != 'Project C'))] - .groupby(parameters['groupby']) + .groupby(p['groupby']) .aggregate({'value': ['sum']}) .reset_index()) - expected_df.columns = parameters['groupby'] + parameters['metrics'] + expected_df.columns = p['groupby'] + p['metrics'] expected_df = expected_df.sort_values(['sum__value'], ascending=False) self.assertEqual(result.df, expected_df) def test_columns_only(self): - parameters = { + p = { 'groupby': [], 'metrics': [], 'granularity': 'received', @@ -483,16 +483,16 @@ def test_columns_only(self): }, 'columns': ['project', 'region', 'received', 'value'], } - result = self.datasource.query(parameters) + result = self.datasource.query(p) self.assertIsInstance(result, QueryResult) self.assertEqual(result.error_message, None) self.assertEqual(result.status, QueryStatus.SUCCESS) - expected_df = self.df[parameters['columns']].copy() + expected_df = self.df[p['columns']].copy() expected_df['received'] = expected_df['received'].astype(str) self.assertEqual(result.df, expected_df) def test_orderby_with_columns(self): - parameters = { + p = { 'groupby': [], 'metrics': [], 'granularity': 'received', @@ -512,7 +512,7 @@ def test_orderby_with_columns(self): ['region', True], ], } - result = self.datasource.query(parameters) + result = self.datasource.query(p) self.assertIsInstance(result, QueryResult) self.assertEqual(result.error_message, None) self.assertEqual(result.status, QueryStatus.SUCCESS) @@ -520,12 +520,12 @@ def test_orderby_with_columns(self): .sort_values(['project', 'region'], ascending=[False, True]) .reset_index(drop=True) - [parameters['columns']]) + [p['columns']]) expected_df['received'] = expected_df['received'].astype(str) self.assertEqual(result.df, expected_df) def test_groupby_only(self): - parameters = { + p = { 'groupby': ['project', 'region'], 'metrics': [], 'granularity': 'received', @@ -540,11 +540,11 @@ def test_groupby_only(self): 'time_grain_sqla': None, }, } - result = self.datasource.query(parameters) + result = self.datasource.query(p) self.assertIsInstance(result, QueryResult) self.assertEqual(result.error_message, None) self.assertEqual(result.status, QueryStatus.SUCCESS) - expected_df = (self.df.groupby(parameters['groupby']) + expected_df = (self.df.groupby(p['groupby']) .size() .reset_index() .sort_values([0], ascending=False) @@ -552,7 +552,7 @@ def test_groupby_only(self): self.assertEqual(result.df, expected_df) def test_groupby_single_metric(self): - parameters = { + p = { 'groupby': ['project', 'region'], 'metrics': ['sum__value'], 'granularity': 'received', @@ -567,19 +567,19 @@ def test_groupby_single_metric(self): 'time_grain_sqla': None, }, } - result = self.datasource.query(parameters) + result = self.datasource.query(p) self.assertIsInstance(result, QueryResult) self.assertEqual(result.error_message, None) self.assertEqual(result.status, QueryStatus.SUCCESS) - expected_df = (self.df.groupby(parameters['groupby'])['value'] + expected_df = (self.df.groupby(p['groupby'])['value'] .sum() .reset_index() .sort_values(['value'], ascending=False)) - expected_df.columns = parameters['groupby'] + parameters['metrics'] + expected_df.columns = p['groupby'] + p['metrics'] self.assertEqual(result.df, expected_df) def test_groupby_multiple_metrics(self): - parameters = { + p = { 'groupby': ['project', 'region'], 'metrics': ['sum__value', 'avg__value', 'value_percentage', 'ratio'], 'granularity': 'received', @@ -595,25 +595,25 @@ def test_groupby_multiple_metrics(self): }, } self.df['ratio'] = self.df['value'] / self.df['value2'] - result = self.datasource.query(parameters) + result = self.datasource.query(p) self.assertIsInstance(result, QueryResult) self.assertEqual(result.error_message, None) self.assertEqual(result.status, QueryStatus.SUCCESS) - expected_df = (self.df.groupby(parameters['groupby']) + expected_df = (self.df.groupby(p['groupby']) .aggregate(OrderedDict([('value', ['sum', 'mean']), ('ratio', ['mean'])]))) - expected_df['value_percentage'] = (self.df.groupby(parameters['groupby']) + expected_df['value_percentage'] = (self.df.groupby(p['groupby']) .apply(lambda x: sum(x['value']) / sum(x['value'] + x['value2']))) expected_df = expected_df.reset_index() - expected_df.columns = (parameters['groupby'] + + expected_df.columns = (p['groupby'] + ['sum__value', 'avg__value', 'ratio', 'value_percentage']) - expected_df = (expected_df[parameters['groupby'] + parameters['metrics']] + expected_df = (expected_df[p['groupby'] + p['metrics']] .sort_values(['sum__value'], ascending=False)) self.assertEqual(result.df, expected_df) def test_groupby_ratio_metric(self): - parameters = { + p = { 'groupby': ['project', 'region'], 'metrics': ['ratio'], 'granularity': 'received', @@ -629,19 +629,19 @@ def test_groupby_ratio_metric(self): }, } self.df['ratio'] = self.df['value'] / self.df['value2'] - result = self.datasource.query(parameters) + result = self.datasource.query(p) self.assertIsInstance(result, QueryResult) self.assertEqual(result.error_message, None) self.assertEqual(result.status, QueryStatus.SUCCESS) - expected_df = (self.df.groupby(parameters['groupby'])['ratio'] + expected_df = (self.df.groupby(p['groupby'])['ratio'] .mean() .reset_index() .sort_values(['ratio'], ascending=False)) - expected_df.columns = parameters['groupby'] + parameters['metrics'] + expected_df.columns = p['groupby'] + p['metrics'] self.assertEqual(result.df, expected_df) def test_groupby_value_percentage_metric(self): - parameters = { + p = { 'groupby': ['project', 'region'], 'metrics': ['value_percentage'], 'granularity': 'received', @@ -656,20 +656,20 @@ def test_groupby_value_percentage_metric(self): 'time_grain_sqla': None, }, } - result = self.datasource.query(parameters) + result = self.datasource.query(p) self.assertIsInstance(result, QueryResult) self.assertEqual(result.error_message, None) self.assertEqual(result.status, QueryStatus.SUCCESS) - expected_df = (self.df.groupby(parameters['groupby']) + expected_df = (self.df.groupby(p['groupby']) .apply(lambda x: sum(x['value']) / sum(x['value'] + x['value2'])) .reset_index() .sort_values([0], ascending=False)) - expected_df.columns = parameters['groupby'] + parameters['metrics'] + expected_df.columns = p['groupby'] + p['metrics'] self.assertEqual(result.df, expected_df) def test_groupby_category_percentage_metric(self): - parameters = { + p = { 'groupby': ['project', 'region'], 'metrics': ['category_percentage'], 'granularity': 'received', @@ -684,21 +684,21 @@ def test_groupby_category_percentage_metric(self): 'time_grain_sqla': None, }, } - result = self.datasource.query(parameters) + result = self.datasource.query(p) self.assertIsInstance(result, QueryResult) self.assertEqual(result.error_message, None) self.assertEqual(result.status, QueryStatus.SUCCESS) - expected_df = (self.df.groupby(parameters['groupby'])['category'] + expected_df = (self.df.groupby(p['groupby'])['category'] .value_counts(normalize=True) - .reset_index(parameters['groupby']) + .reset_index(p['groupby']) .loc['CategoryA'] .reset_index(drop=True) .sort_values(['category'], ascending=False)) - expected_df.columns = parameters['groupby'] + parameters['metrics'] + expected_df.columns = p['groupby'] + p['metrics'] self.assertEqual(result.df, expected_df) def test_groupby_ascending_order(self): - parameters = { + p = { 'groupby': ['project', 'region'], 'metrics': ['sum__value', 'avg__value'], 'granularity': 'received', @@ -714,19 +714,19 @@ def test_groupby_ascending_order(self): }, 'order_desc': False, } - result = self.datasource.query(parameters) + result = self.datasource.query(p) self.assertIsInstance(result, QueryResult) self.assertEqual(result.error_message, None) self.assertEqual(result.status, QueryStatus.SUCCESS) - expected_df = (self.df.groupby(parameters['groupby']) + expected_df = (self.df.groupby(p['groupby']) .aggregate({'value': ['sum', 'mean']}) .reset_index()) - expected_df.columns = parameters['groupby'] + parameters['metrics'] + expected_df.columns = p['groupby'] + p['metrics'] expected_df = expected_df.sort_values(['sum__value'], ascending=True) self.assertEqual(result.df, expected_df) def test_timeseries_single_metric(self): - parameters = { + p = { 'groupby': [], 'metrics': ['sum__value'], 'granularity': 'received', @@ -744,27 +744,27 @@ def test_timeseries_single_metric(self): }, 'order_desc': True, } - result = self.datasource.query(parameters) + result = self.datasource.query(p) self.assertIsInstance(result, QueryResult) self.assertEqual(result.error_message, None) self.assertEqual(result.status, QueryStatus.SUCCESS) - time_grain = PandasDatasource.GRAINS[parameters['extras']['time_grain_sqla']] - expected_df = (self.df.groupby(parameters['groupby'] + - [pd.Grouper(key=parameters['granularity'], + time_grain = PandasDatasource.GRAINS[p['extras']['time_grain_sqla']] + expected_df = (self.df.groupby(p['groupby'] + + [pd.Grouper(key=p['granularity'], freq=time_grain)]) .aggregate({'value': ['sum']}) .reset_index()) - expected_df.columns = (parameters['groupby'] + + expected_df.columns = (p['groupby'] + ['__timestamp'] + - parameters['metrics']) + p['metrics']) expected_df['__timestamp'] = expected_df['__timestamp'].astype(str) - expected_df = (expected_df.sort_values(parameters['metrics'][0], - ascending=(not parameters['order_desc'])) + expected_df = (expected_df.sort_values(p['metrics'][0], + ascending=(not p['order_desc'])) .reset_index(drop=True)) self.assertEqual(result.df, expected_df) def test_timeseries_multiple_metrics(self): - parameters = { + p = { 'groupby': [], 'metrics': ['sum__value', 'avg__value'], 'granularity': 'received', @@ -782,26 +782,26 @@ def test_timeseries_multiple_metrics(self): }, 'order_desc': True, } - result = self.datasource.query(parameters) + result = self.datasource.query(p) self.assertIsInstance(result, QueryResult) self.assertEqual(result.error_message, None) self.assertEqual(result.status, QueryStatus.SUCCESS) - time_grain = PandasDatasource.GRAINS[parameters['extras']['time_grain_sqla']] - expected_df = (self.df.groupby(parameters['groupby'] + - [pd.Grouper(key=parameters['granularity'], + time_grain = PandasDatasource.GRAINS[p['extras']['time_grain_sqla']] + expected_df = (self.df.groupby(p['groupby'] + + [pd.Grouper(key=p['granularity'], freq=time_grain)]) .aggregate({'value': ['sum', 'mean']}) .reset_index()) - expected_df.columns = (parameters['groupby'] + + expected_df.columns = (p['groupby'] + ['__timestamp'] + - parameters['metrics']) + p['metrics']) expected_df['__timestamp'] = expected_df['__timestamp'].astype(str) - expected_df = (expected_df.sort_values(parameters['metrics'][0], - ascending=(not parameters['order_desc']))) + expected_df = (expected_df.sort_values(p['metrics'][0], + ascending=(not p['order_desc']))) self.assertEqual(result.df, expected_df) def test_timeseries_groupby(self): - parameters = { + p = { 'groupby': ['project'], 'metrics': ['sum__value', 'avg__value'], 'granularity': 'received', @@ -818,26 +818,26 @@ def test_timeseries_groupby(self): 'time_grain_sqla': 'day', }, } - result = self.datasource.query(parameters) + result = self.datasource.query(p) self.assertIsInstance(result, QueryResult) self.assertEqual(result.error_message, None) self.assertEqual(result.status, QueryStatus.SUCCESS) - time_grain = PandasDatasource.GRAINS[parameters['extras']['time_grain_sqla']] - expected_df = (self.df.groupby(parameters['groupby'] + - [pd.Grouper(key=parameters['granularity'], + time_grain = PandasDatasource.GRAINS[p['extras']['time_grain_sqla']] + expected_df = (self.df.groupby(p['groupby'] + + [pd.Grouper(key=p['granularity'], freq=time_grain)]) .aggregate({'value': ['sum', 'mean']}) .reset_index()) - expected_df.columns = (parameters['groupby'] + + expected_df.columns = (p['groupby'] + ['__timestamp'] + - parameters['metrics']) + p['metrics']) expected_df['__timestamp'] = expected_df['__timestamp'].astype(str) expected_df = (expected_df.sort_values(['sum__value'], ascending=False) .reset_index(drop=True)) self.assertEqual(result.df, expected_df) def test_timeseries_limit(self): - parameters = { + p = { 'groupby': ['project', 'district'], 'metrics': ['sum__value'], 'granularity': 'received', @@ -855,32 +855,32 @@ def test_timeseries_limit(self): }, 'order_desc': True, } - result = self.datasource.query(parameters) + result = self.datasource.query(p) self.assertIsInstance(result, QueryResult) self.assertEqual(result.error_message, None) self.assertEqual(result.status, QueryStatus.SUCCESS) - time_grain = PandasDatasource.GRAINS[parameters['extras']['time_grain_sqla']] - limit_df = (self.df.groupby(parameters['groupby']) + time_grain = PandasDatasource.GRAINS[p['extras']['time_grain_sqla']] + limit_df = (self.df.groupby(p['groupby']) .aggregate({'value': 'mean'}) - .sort_values('value', ascending=(not parameters['order_desc'])) - .iloc[:parameters['timeseries_limit']]) - source_df = self.df.set_index(parameters['groupby']) + .sort_values('value', ascending=(not p['order_desc'])) + .iloc[:p['timeseries_limit']]) + source_df = self.df.set_index(p['groupby']) expected_df = (source_df[source_df.index.isin(limit_df.index)] - .groupby(parameters['groupby'] + [pd.Grouper(key=parameters['granularity'], + .groupby(p['groupby'] + [pd.Grouper(key=p['granularity'], freq=time_grain)]) .aggregate({'value': ['sum']}) .reset_index()) - expected_df.columns = (parameters['groupby'] + + expected_df.columns = (p['groupby'] + ['__timestamp'] + - parameters['metrics']) + p['metrics']) expected_df['__timestamp'] = expected_df['__timestamp'].astype(str) expected_df = (expected_df.sort_values(['sum__value'], - ascending=(not parameters['order_desc'])) + ascending=(not p['order_desc'])) .reset_index(drop=True)) self.assertEqual(result.df, expected_df) def test_timeseries_limit_ascending_order(self): - parameters = { + p = { 'groupby': ['project', 'district'], 'metrics': ['sum__value'], 'granularity': 'received', @@ -898,27 +898,27 @@ def test_timeseries_limit_ascending_order(self): }, 'order_desc': False, } - result = self.datasource.query(parameters) + result = self.datasource.query(p) self.assertIsInstance(result, QueryResult) self.assertEqual(result.error_message, None) self.assertEqual(result.status, QueryStatus.SUCCESS) - time_grain = PandasDatasource.GRAINS[parameters['extras']['time_grain_sqla']] - limit_df = (self.df.groupby(parameters['groupby']) + time_grain = PandasDatasource.GRAINS[p['extras']['time_grain_sqla']] + limit_df = (self.df.groupby(p['groupby']) .aggregate({'value': 'mean'}) - .sort_values('value', ascending=(not parameters['order_desc'])) - .iloc[:parameters['timeseries_limit']]) - source_df = self.df.set_index(parameters['groupby']) + .sort_values('value', ascending=(not p['order_desc'])) + .iloc[:p['timeseries_limit']]) + source_df = self.df.set_index(p['groupby']) expected_df = (source_df[source_df.index.isin(limit_df.index)] - .groupby(parameters['groupby'] + [pd.Grouper(key=parameters['granularity'], + .groupby(p['groupby'] + [pd.Grouper(key=p['granularity'], freq=time_grain)]) .aggregate({'value': ['sum']}) .reset_index()) - expected_df.columns = (parameters['groupby'] + + expected_df.columns = (p['groupby'] + ['__timestamp'] + - parameters['metrics']) + p['metrics']) expected_df['__timestamp'] = expected_df['__timestamp'].astype(str) expected_df = (expected_df.sort_values(['sum__value'], - ascending=(not parameters['order_desc'])) + ascending=(not p['order_desc'])) .reset_index(drop=True)) self.assertEqual(result.df, expected_df) @@ -1012,7 +1012,7 @@ def calc_category_percentage(group): self.datasource.calc_category_percentage = calc_category_percentage def test_post_aggregation_filter(self): - parameters = { + p = { 'groupby': ['project', 'region'], 'metrics': ['sum__value'], 'granularity': 'received', @@ -1029,15 +1029,15 @@ def test_post_aggregation_filter(self): ], }, } - result = self.datasource.query(parameters) + result = self.datasource.query(p) self.assertIsInstance(result, QueryResult) self.assertEqual(result.error_message, None) self.assertEqual(result.status, QueryStatus.SUCCESS) expected_df = (self.df - .groupby(parameters['groupby']) + .groupby(p['groupby']) .aggregate({'value': ['sum']}) .reset_index()) - expected_df.columns = parameters['groupby'] + parameters['metrics'] + expected_df.columns = p['groupby'] + p['metrics'] expected_df = (expected_df.loc[expected_df['sum__value'] >= 150] .sort_values(['sum__value'], ascending=False)) self.assertEqual(result.df, expected_df)