Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[sqllab] improve Hive support #3187

Merged
merged 2 commits into from
Jul 27, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ def get_git_sha():
'pandas==0.20.2',
'parsedatetime==2.0.0',
'pydruid==0.3.1',
'PyHive>=0.3.0',
'PyHive>=0.4.0',
'python-dateutil==2.6.0',
'requests==2.17.3',
'simplejson==3.10.0',
Expand Down
14 changes: 14 additions & 0 deletions superset/assets/javascripts/SqlLab/components/ResultSet.jsx
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,7 @@ export default class ResultSet extends React.PureComponent {
}
if (['running', 'pending', 'fetching'].indexOf(query.state) > -1) {
let progressBar;
let trackingUrl;
if (query.progress > 0 && query.state === 'running') {
progressBar = (
<ProgressBar
Expand All @@ -163,11 +164,24 @@ export default class ResultSet extends React.PureComponent {
label={`${query.progress}%`}
/>);
}
if (query.trackingUrl) {
trackingUrl = (
<Button
bsSize="small"
onClick={() => { window.open(query.trackingUrl); }}
>
Track Job
</Button>
);
}
return (
<div>
<img className="loading" alt="Loading..." src="/static/assets/images/loading.gif" />
<QueryStateLabel query={query} />
{progressBar}
<div>
{trackingUrl}
</div>
</div>
);
} else if (query.state === 'failed') {
Expand Down
1 change: 1 addition & 0 deletions superset/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -241,6 +241,7 @@ class CeleryConfig(object):
CELERY_IMPORTS = ('superset.sql_lab', )
CELERY_RESULT_BACKEND = 'db+sqlite:///celery_results.sqlite'
CELERY_ANNOTATIONS = {'tasks.add': {'rate_limit': '10/s'}}
CELERYD_LOG_LEVEL = 'DEBUG'
CELERY_CONFIG = CeleryConfig
"""
CELERY_CONFIG = None
Expand Down
78 changes: 53 additions & 25 deletions superset/db_engine_specs.py
Original file line number Diff line number Diff line change
Expand Up @@ -636,6 +636,21 @@ class HiveEngineSpec(PrestoEngineSpec):
engine = 'hive'
cursor_execute_kwargs = {'async': True}

# Scoping regex at class level to avoid recompiling
# 17/02/07 19:36:38 INFO ql.Driver: Total jobs = 5
jobs_stats_r = re.compile(
r'.*INFO.*Total jobs = (?P<max_jobs>[0-9]+)')
# 17/02/07 19:37:08 INFO ql.Driver: Launching Job 2 out of 5
launching_job_r = re.compile(
'.*INFO.*Launching Job (?P<job_number>[0-9]+) out of '
'(?P<max_jobs>[0-9]+)')
# 17/02/07 19:36:58 INFO exec.Task: 2017-02-07 19:36:58,152 Stage-18
# map = 0%, reduce = 0%
stage_progress_r = re.compile(
r'.*INFO.*Stage-(?P<stage_number>[0-9]+).*'
r'map = (?P<map_progress>[0-9]+)%.*'
r'reduce = (?P<reduce_progress>[0-9]+)%.*')

@classmethod
def patch(cls):
from pyhive import hive
Expand Down Expand Up @@ -665,38 +680,27 @@ def adjust_database_uri(cls, uri, selected_schema=None):
return uri

@classmethod
def progress(cls, logs):
# 17/02/07 19:36:38 INFO ql.Driver: Total jobs = 5
jobs_stats_r = re.compile(
r'.*INFO.*Total jobs = (?P<max_jobs>[0-9]+)')
# 17/02/07 19:37:08 INFO ql.Driver: Launching Job 2 out of 5
launching_job_r = re.compile(
'.*INFO.*Launching Job (?P<job_number>[0-9]+) out of '
'(?P<max_jobs>[0-9]+)')
# 17/02/07 19:36:58 INFO exec.Task: 2017-02-07 19:36:58,152 Stage-18
# map = 0%, reduce = 0%
stage_progress = re.compile(
r'.*INFO.*Stage-(?P<stage_number>[0-9]+).*'
r'map = (?P<map_progress>[0-9]+)%.*'
r'reduce = (?P<reduce_progress>[0-9]+)%.*')
total_jobs = None
def progress(cls, log_lines):
total_jobs = 1 # assuming there's at least 1 job
current_job = None
stages = {}
lines = logs.splitlines()
for line in lines:
match = jobs_stats_r.match(line)
for line in log_lines:
match = cls.jobs_stats_r.match(line)
if match:
total_jobs = int(match.groupdict()['max_jobs'])
match = launching_job_r.match(line)
total_jobs = int(match.groupdict()['max_jobs']) or 1
match = cls.launching_job_r.match(line)
if match:
current_job = int(match.groupdict()['job_number'])
stages = {}
match = stage_progress.match(line)
match = cls.stage_progress_r.match(line)
if match:
stage_number = int(match.groupdict()['stage_number'])
map_progress = int(match.groupdict()['map_progress'])
reduce_progress = int(match.groupdict()['reduce_progress'])
stages[stage_number] = (map_progress + reduce_progress) / 2
logging.info(
"Progress detail: {}, "
"total jobs: {}".format(stages, total_jobs))

if not total_jobs or not current_job:
return 0
Expand All @@ -708,6 +712,13 @@ def progress(cls, logs):
)
return int(progress)

@classmethod
def get_tracking_url(cls, log_lines):
lkp = "Tracking URL = "
for line in log_lines:
if lkp in line:
return line.split(lkp)[1]

@classmethod
def handle_cursor(cls, cursor, query, session):
"""Updates progress information"""
Expand All @@ -717,18 +728,35 @@ def handle_cursor(cls, cursor, query, session):
hive.ttypes.TOperationState.RUNNING_STATE,
)
polled = cursor.poll()
last_log_line = 0
tracking_url = None
while polled.operationState in unfinished_states:
query = session.query(type(query)).filter_by(id=query.id).one()
if query.status == QueryStatus.STOPPED:
cursor.cancel()
break

logs = cursor.fetch_logs()
if logs:
progress = cls.progress(logs)
resp = cursor.fetch_logs()
if resp and resp.log:
log = resp.log or ''
log_lines = resp.log.splitlines()
logging.info("\n".join(log_lines[last_log_line:]))
last_log_line = len(log_lines) - 1
progress = cls.progress(log_lines)
logging.info("Progress total: {}".format(progress))
needs_commit = False
if progress > query.progress:
query.progress = progress
session.commit()
needs_commit = True
if not tracking_url:
tracking_url = cls.get_tracking_url(log_lines)
if tracking_url:
logging.info(
"Found the tracking url: {}".format(tracking_url))
query.tracking_url = tracking_url
needs_commit = True
if needs_commit:
session.commit()
time.sleep(5)
polled = cursor.poll()

Expand Down
23 changes: 23 additions & 0 deletions superset/migrations/versions/ca69c70ec99b_tracking_url.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
"""tracking_url
Revision ID: ca69c70ec99b
Revises: a65458420354
Create Date: 2017-07-26 20:09:52.606416
"""

# revision identifiers, used by Alembic.
revision = 'ca69c70ec99b'
down_revision = 'a65458420354'

from alembic import op
import sqlalchemy as sa
from sqlalchemy.dialects import mysql


def upgrade():
op.add_column('query', sa.Column('tracking_url', sa.Text(), nullable=True))


def downgrade():
op.drop_column('query', 'tracking_url')
2 changes: 2 additions & 0 deletions superset/models/sql_lab.py
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ class Query(Model):
start_running_time = Column(Numeric(precision=20, scale=6))
end_time = Column(Numeric(precision=20, scale=6))
end_result_backend_time = Column(Numeric(precision=20, scale=6))
tracking_url = Column(Text)

changed_on = Column(
DateTime,
Expand Down Expand Up @@ -119,6 +120,7 @@ def to_dict(self):
'user': self.user.username,
'limit_reached': self.limit_reached,
'resultsKey': self.results_key,
'trackingUrl': self.tracking_url,
}

@property
Expand Down
5 changes: 4 additions & 1 deletion superset/sql_lab.py
Original file line number Diff line number Diff line change
Expand Up @@ -193,6 +193,9 @@ def handle_error(msg):
conn.close()
return handle_error(db_engine_spec.extract_error_message(e))

logging.info("Fetching cursor description")
cursor_description = cursor.description

conn.commit()
conn.close()

Expand All @@ -204,7 +207,7 @@ def handle_error(msg):
}, default=utils.json_iso_dttm_ser)

column_names = (
[col[0] for col in cursor.description] if cursor.description else [])
[col[0] for col in cursor_description] if cursor_description else [])
column_names = dedup(column_names)
cdf = dataframe.SupersetDataFrame(pd.DataFrame(
list(data), columns=column_names))
Expand Down
3 changes: 2 additions & 1 deletion superset/views/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -427,7 +427,7 @@ class SliceAddView(SliceModelView): # noqa

class DashboardModelView(SupersetModelView, DeleteMixin): # noqa
datamodel = SQLAInterface(models.Dashboard)

list_title = _('List Dashboards')
show_title = _('Show Dashboard')
add_title = _('Add Dashboard')
Expand Down Expand Up @@ -2030,6 +2030,7 @@ def sql_json(self):

# Async request.
if async:
logging.info("Running query on a Celery worker")
# Ignore the celery future object and the request may time out.
try:
sql_lab.get_sql_results.delay(
Expand Down
40 changes: 21 additions & 19 deletions tests/db_engine_specs_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,53 +5,55 @@

import unittest

from superset import db_engine_specs
from superset.db_engine_specs import HiveEngineSpec


class DbEngineSpecsTestCase(unittest.TestCase):
def test_0_progress(self):
log = """
17/02/07 18:26:27 INFO log.PerfLogger: <PERFLOG method=compile from=org.apache.hadoop.hive.ql.Driver>
17/02/07 18:26:27 INFO log.PerfLogger: <PERFLOG method=parse from=org.apache.hadoop.hive.ql.Driver>
"""
self.assertEquals(0, db_engine_specs.HiveEngineSpec.progress(log))
""".split('\n')
self.assertEquals(
0, HiveEngineSpec.progress(log))

def test_0_progress(self):
log = """
17/02/07 18:26:27 INFO log.PerfLogger: <PERFLOG method=compile from=org.apache.hadoop.hive.ql.Driver>
17/02/07 18:26:27 INFO log.PerfLogger: <PERFLOG method=parse from=org.apache.hadoop.hive.ql.Driver>
"""
self.assertEquals(0, db_engine_specs.HiveEngineSpec.progress(log))
""".split('\n')
self.assertEquals(
0, HiveEngineSpec.progress(log))

def test_number_of_jobs_progress(self):
log = """
17/02/07 19:15:55 INFO ql.Driver: Total jobs = 2
"""
self.assertEquals(0, db_engine_specs.HiveEngineSpec.progress(log))
""".split('\n')
self.assertEquals(0, HiveEngineSpec.progress(log))

def test_job_1_launched_progress(self):
log = """
17/02/07 19:15:55 INFO ql.Driver: Total jobs = 2
17/02/07 19:15:55 INFO ql.Driver: Launching Job 1 out of 2
"""
self.assertEquals(0, db_engine_specs.HiveEngineSpec.progress(log))
""".split('\n')
self.assertEquals(0, HiveEngineSpec.progress(log))

def test_job_1_launched_stage_1_0_progress(self):
log = """
17/02/07 19:15:55 INFO ql.Driver: Total jobs = 2
17/02/07 19:15:55 INFO ql.Driver: Launching Job 1 out of 2
17/02/07 19:16:09 INFO exec.Task: 2017-02-07 19:16:09,173 Stage-1 map = 0%, reduce = 0%
"""
self.assertEquals(0, db_engine_specs.HiveEngineSpec.progress(log))
""".split('\n')
self.assertEquals(0, HiveEngineSpec.progress(log))

def test_job_1_launched_stage_1_map_40_progress(self):
log = """
17/02/07 19:15:55 INFO ql.Driver: Total jobs = 2
17/02/07 19:15:55 INFO ql.Driver: Launching Job 1 out of 2
17/02/07 19:16:09 INFO exec.Task: 2017-02-07 19:16:09,173 Stage-1 map = 0%, reduce = 0%
17/02/07 19:16:09 INFO exec.Task: 2017-02-07 19:16:09,173 Stage-1 map = 40%, reduce = 0%
"""
self.assertEquals(10, db_engine_specs.HiveEngineSpec.progress(log))
""".split('\n')
self.assertEquals(10, HiveEngineSpec.progress(log))

def test_job_1_launched_stage_1_map_80_reduce_40_progress(self):
log = """
Expand All @@ -60,8 +62,8 @@ def test_job_1_launched_stage_1_map_80_reduce_40_progress(self):
17/02/07 19:16:09 INFO exec.Task: 2017-02-07 19:16:09,173 Stage-1 map = 0%, reduce = 0%
17/02/07 19:16:09 INFO exec.Task: 2017-02-07 19:16:09,173 Stage-1 map = 40%, reduce = 0%
17/02/07 19:16:09 INFO exec.Task: 2017-02-07 19:16:09,173 Stage-1 map = 80%, reduce = 40%
"""
self.assertEquals(30, db_engine_specs.HiveEngineSpec.progress(log))
""".split('\n')
self.assertEquals(30, HiveEngineSpec.progress(log))

def test_job_1_launched_stage_2_stages_progress(self):
log = """
Expand All @@ -72,8 +74,8 @@ def test_job_1_launched_stage_2_stages_progress(self):
17/02/07 19:16:09 INFO exec.Task: 2017-02-07 19:16:09,173 Stage-1 map = 80%, reduce = 40%
17/02/07 19:16:09 INFO exec.Task: 2017-02-07 19:16:09,173 Stage-2 map = 0%, reduce = 0%
17/02/07 19:16:09 INFO exec.Task: 2017-02-07 19:16:09,173 Stage-1 map = 100%, reduce = 0%
"""
self.assertEquals(12, db_engine_specs.HiveEngineSpec.progress(log))
""".split('\n')
self.assertEquals(12, HiveEngineSpec.progress(log))

def test_job_2_launched_stage_2_stages_progress(self):
log = """
Expand All @@ -83,5 +85,5 @@ def test_job_2_launched_stage_2_stages_progress(self):
17/02/07 19:15:55 INFO ql.Driver: Launching Job 2 out of 2
17/02/07 19:16:09 INFO exec.Task: 2017-02-07 19:16:09,173 Stage-1 map = 0%, reduce = 0%
17/02/07 19:16:09 INFO exec.Task: 2017-02-07 19:16:09,173 Stage-1 map = 40%, reduce = 0%
"""
self.assertEquals(60, db_engine_specs.HiveEngineSpec.progress(log))
""".split('\n')
self.assertEquals(60, HiveEngineSpec.progress(log))
5 changes: 1 addition & 4 deletions tests/sqllab_tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -189,12 +189,9 @@ def test_search_query_on_time(self):
from_time = 'from={}'.format(int(first_query_time))
to_time = 'to={}'.format(int(second_query_time))
params = [from_time, to_time]
resp = self.get_resp('/superset/search_queries?'+'&'.join(params))
resp = self.get_resp('/superset/search_queries?' + '&'.join(params))
data = json.loads(resp)
self.assertEquals(2, len(data))
for k in data:
self.assertLess(int(first_query_time), k['startDttm'])
self.assertLess(k['startDttm'], int(second_query_time))

def test_alias_duplicate(self):
self.run_sql(
Expand Down