Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[3541] Augmenting datasources uniqueness constraints #3583

Merged
merged 1 commit into from
Nov 20, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
47 changes: 23 additions & 24 deletions superset/connectors/druid/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
from six import string_types
import sqlalchemy as sa
from sqlalchemy import (
Boolean, Column, DateTime, ForeignKey, Integer, or_, String, Text,
Boolean, Column, DateTime, ForeignKey, Integer, or_, String, Text, UniqueConstraint,
)
from sqlalchemy.orm import backref, relationship

Expand Down Expand Up @@ -169,7 +169,7 @@ def refresh_async(self, datasource_names, merge_flag, refreshAll):
if cols:
col_objs_list = (
session.query(DruidColumn)
.filter(DruidColumn.datasource_name == datasource.datasource_name)
.filter(DruidColumn.datasource_id == datasource.id)
.filter(or_(DruidColumn.column_name == col for col in cols))
)
col_objs = {col.column_name: col for col in col_objs_list}
Expand All @@ -179,7 +179,7 @@ def refresh_async(self, datasource_names, merge_flag, refreshAll):
col_obj = col_objs.get(col, None)
if not col_obj:
col_obj = DruidColumn(
datasource_name=datasource.datasource_name,
datasource_id=datasource.id,
column_name=col)
with session.no_autoflush:
session.add(col_obj)
Expand Down Expand Up @@ -220,9 +220,9 @@ class DruidColumn(Model, BaseColumn):

__tablename__ = 'columns'

datasource_name = Column(
String(255),
ForeignKey('datasources.datasource_name'))
datasource_id = Column(
Integer,
ForeignKey('datasources.id'))
# Setting enable_typechecks=False disables polymorphic inheritance.
datasource = relationship(
'DruidDatasource',
Expand All @@ -231,7 +231,7 @@ class DruidColumn(Model, BaseColumn):
dimension_spec_json = Column(Text)

export_fields = (
'datasource_name', 'column_name', 'is_active', 'type', 'groupby',
'datasource_id', 'column_name', 'is_active', 'type', 'groupby',
'count_distinct', 'sum', 'avg', 'max', 'min', 'filterable',
'description', 'dimension_spec_json',
)
Expand Down Expand Up @@ -334,23 +334,22 @@ def generate_metrics(self):
metrics = self.get_metrics()
dbmetrics = (
db.session.query(DruidMetric)
.filter(DruidCluster.cluster_name == self.datasource.cluster_name)
.filter(DruidMetric.datasource_name == self.datasource_name)
.filter(DruidMetric.datasource_id == self.datasource_id)
.filter(or_(
DruidMetric.metric_name == m for m in metrics
))
)
dbmetrics = {metric.metric_name: metric for metric in dbmetrics}
for metric in metrics.values():
metric.datasource_name = self.datasource_name
metric.datasource_id = self.datasource_id
if not dbmetrics.get(metric.metric_name, None):
db.session.add(metric)

@classmethod
def import_obj(cls, i_column):
def lookup_obj(lookup_column):
return db.session.query(DruidColumn).filter(
DruidColumn.datasource_name == lookup_column.datasource_name,
DruidColumn.datasource_id == lookup_column.datasource_id,
DruidColumn.column_name == lookup_column.column_name).first()

return import_util.import_simple_obj(db.session, i_column, lookup_obj)
Expand All @@ -361,9 +360,9 @@ class DruidMetric(Model, BaseMetric):
"""ORM object referencing Druid metrics for a datasource"""

__tablename__ = 'metrics'
datasource_name = Column(
String(255),
ForeignKey('datasources.datasource_name'))
datasource_id = Column(
Integer,
ForeignKey('datasources.id'))
# Setting enable_typechecks=False disables polymorphic inheritance.
datasource = relationship(
'DruidDatasource',
Expand All @@ -372,7 +371,7 @@ class DruidMetric(Model, BaseMetric):
json = Column(Text)

export_fields = (
'metric_name', 'verbose_name', 'metric_type', 'datasource_name',
'metric_name', 'verbose_name', 'metric_type', 'datasource_id',
'json', 'description', 'is_restricted', 'd3format',
)

Expand Down Expand Up @@ -400,7 +399,7 @@ def perm(self):
def import_obj(cls, i_metric):
def lookup_obj(lookup_metric):
return db.session.query(DruidMetric).filter(
DruidMetric.datasource_name == lookup_metric.datasource_name,
DruidMetric.datasource_id == lookup_metric.datasource_id,
DruidMetric.metric_name == lookup_metric.metric_name).first()
return import_util.import_simple_obj(db.session, i_metric, lookup_obj)

Expand All @@ -420,7 +419,7 @@ class DruidDatasource(Model, BaseDatasource):
baselink = 'druiddatasourcemodelview'

# Columns
datasource_name = Column(String(255), unique=True)
datasource_name = Column(String(255))
is_hidden = Column(Boolean, default=False)
fetch_values_from = Column(String(100))
cluster_name = Column(
Expand All @@ -432,6 +431,7 @@ class DruidDatasource(Model, BaseDatasource):
sm.user_model,
backref=backref('datasources', cascade='all, delete-orphan'),
foreign_keys=[user_id])
UniqueConstraint('cluster_name', 'datasource_name')

export_fields = (
'datasource_name', 'is_hidden', 'description', 'default_endpoint',
Expand Down Expand Up @@ -519,7 +519,7 @@ def import_obj(cls, i_datasource, import_time=None):
superset instances. Audit metadata isn't copies over.
"""
def lookup_datasource(d):
return db.session.query(DruidDatasource).join(DruidCluster).filter(
return db.session.query(DruidDatasource).filter(
DruidDatasource.datasource_name == d.datasource_name,
DruidCluster.cluster_name == d.cluster_name,
).first()
Expand Down Expand Up @@ -616,13 +616,12 @@ def generate_metrics_for(self, columns):
metrics.update(col.get_metrics())
dbmetrics = (
db.session.query(DruidMetric)
.filter(DruidCluster.cluster_name == self.cluster_name)
.filter(DruidMetric.datasource_name == self.datasource_name)
.filter(DruidMetric.datasource_id == self.id)
.filter(or_(DruidMetric.metric_name == m for m in metrics))
)
dbmetrics = {metric.metric_name: metric for metric in dbmetrics}
for metric in metrics.values():
metric.datasource_name = self.datasource_name
metric.datasource_id = self.id
if not dbmetrics.get(metric.metric_name, None):
with db.session.no_autoflush:
db.session.add(metric)
Expand Down Expand Up @@ -657,15 +656,15 @@ def sync_to_db_from_config(
dimensions = druid_config['dimensions']
col_objs = (
session.query(DruidColumn)
.filter(DruidColumn.datasource_name == druid_config['name'])
.filter(DruidColumn.datasource_id == datasource.id)
.filter(or_(DruidColumn.column_name == dim for dim in dimensions))
)
col_objs = {col.column_name: col for col in col_objs}
for dim in dimensions:
col_obj = col_objs.get(dim, None)
if not col_obj:
col_obj = DruidColumn(
datasource_name=druid_config['name'],
datasource_id=datasource.id,
column_name=dim,
groupby=True,
filterable=True,
Expand All @@ -677,7 +676,7 @@ def sync_to_db_from_config(
# Import Druid metrics
metric_objs = (
session.query(DruidMetric)
.filter(DruidMetric.datasource_name == druid_config['name'])
.filter(DruidMetric.datasource_id == datasource.id)
.filter(or_(DruidMetric.metric_name == spec['name']
for spec in druid_config['metrics_spec']))
)
Expand Down
201 changes: 201 additions & 0 deletions superset/migrations/versions/4736ec66ce19_.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,201 @@
"""empty message

Revision ID: 4736ec66ce19
Revises: f959a6652acd
Create Date: 2017-10-03 14:37:01.376578

"""

# revision identifiers, used by Alembic.
revision = '4736ec66ce19'
down_revision = 'f959a6652acd'

from alembic import op
import sqlalchemy as sa
from sqlalchemy.exc import OperationalError

from superset.utils import (
generic_find_fk_constraint_name,
generic_find_fk_constraint_names,
generic_find_uq_constraint_name,
)


conv = {
'fk': 'fk_%(table_name)s_%(column_0_name)s_%(referred_table_name)s',
'uq': 'uq_%(table_name)s_%(column_0_name)s',
}

# Helper table for database migrations using minimal schema.
datasources = sa.Table(
'datasources',
sa.MetaData(),
sa.Column('id', sa.Integer, primary_key=True),
sa.Column('datasource_name', sa.String(255)),
)

bind = op.get_bind()
insp = sa.engine.reflection.Inspector.from_engine(bind)


def upgrade():

# Add the new less restrictive uniqueness constraint.
with op.batch_alter_table('datasources', naming_convention=conv) as batch_op:
batch_op.create_unique_constraint(
'uq_datasources_cluster_name',
['cluster_name', 'datasource_name'],
)

# Augment the tables which have a foreign key constraint related to the
# datasources.datasource_name column.
for foreign in ['columns', 'metrics']:
with op.batch_alter_table(foreign, naming_convention=conv) as batch_op:

# Add the datasource_id column with the relevant constraints.
batch_op.add_column(sa.Column('datasource_id', sa.Integer))

batch_op.create_foreign_key(
'fk_{}_datasource_id_datasources'.format(foreign),
'datasources',
['datasource_id'],
['id'],
)

# Helper table for database migration using minimal schema.
table = sa.Table(
foreign,
sa.MetaData(),
sa.Column('id', sa.Integer, primary_key=True),
sa.Column('datasource_name', sa.String(255)),
sa.Column('datasource_id', sa.Integer),
)

# Migrate the existing data.
for datasource in bind.execute(datasources.select()):
bind.execute(
table.update().where(
table.c.datasource_name == datasource.datasource_name,
).values(
datasource_id=datasource.id,
),
)

with op.batch_alter_table(foreign, naming_convention=conv) as batch_op:

# Drop the datasource_name column and associated constraints. Note
# due to prior revisions (1226819ee0e3, 3b626e2a6783) there may
# incorectly be multiple duplicate constraints.
names = generic_find_fk_constraint_names(
foreign,
{'datasource_name'},
'datasources',
insp,
)

for name in names:
batch_op.drop_constraint(
name or 'fk_{}_datasource_name_datasources'.format(foreign),
type_='foreignkey',
)

batch_op.drop_column('datasource_name')

# Drop the old more restrictive uniqueness constraint.
with op.batch_alter_table('datasources', naming_convention=conv) as batch_op:
batch_op.drop_constraint(
generic_find_uq_constraint_name(
'datasources',
{'datasource_name'},
insp,
) or 'uq_datasources_datasource_name',
type_='unique',
)


def downgrade():

# Add the new more restrictive uniqueness constraint which is required by
# the foreign key constraints. Note this operation will fail if the
# datasources.datasource_name column is no longer unique.
with op.batch_alter_table('datasources', naming_convention=conv) as batch_op:
batch_op.create_unique_constraint(
'uq_datasources_datasource_name',
['datasource_name'],
)

# Augment the tables which have a foreign key constraint related to the
# datasources.datasource_id column.
for foreign in ['columns', 'metrics']:
with op.batch_alter_table(foreign, naming_convention=conv) as batch_op:

# Add the datasource_name column with the relevant constraints.
batch_op.add_column(sa.Column('datasource_name', sa.String(255)))

batch_op.create_foreign_key(
'fk_{}_datasource_name_datasources'.format(foreign),
'datasources',
['datasource_name'],
['datasource_name'],
)

# Helper table for database migration using minimal schema.
table = sa.Table(
foreign,
sa.MetaData(),
sa.Column('id', sa.Integer, primary_key=True),
sa.Column('datasource_name', sa.String(255)),
sa.Column('datasource_id', sa.Integer),
)

# Migrate the existing data.
for datasource in bind.execute(datasources.select()):
bind.execute(
table.update().where(
table.c.datasource_id == datasource.id,
).values(
datasource_name=datasource.datasource_name,
),
)

with op.batch_alter_table(foreign, naming_convention=conv) as batch_op:

# Drop the datasource_id column and associated constraint.
batch_op.drop_constraint(
'fk_{}_datasource_id_datasources'.format(foreign),
type_='foreignkey',
)

batch_op.drop_column('datasource_id')

with op.batch_alter_table('datasources', naming_convention=conv) as batch_op:

# Prior to dropping the uniqueness constraint, the foreign key
# associated with the cluster_name column needs to be dropped.
batch_op.drop_constraint(
generic_find_fk_constraint_name(
'datasources',
{'cluster_name'},
'clusters',
insp,
) or 'fk_datasources_cluster_name_clusters',
type_='foreignkey',
)

# Drop the old less restrictive uniqueness constraint.
batch_op.drop_constraint(
generic_find_uq_constraint_name(
'datasources',
{'cluster_name', 'datasource_name'},
insp,
) or 'uq_datasources_cluster_name',
type_='unique',
)

# Re-create the foreign key associated with the cluster_name column.
batch_op.create_foreign_key(
'fk_{}_datasource_id_datasources'.format(foreign),
'clusters',
['cluster_name'],
['cluster_name'],
)
Loading