From a24f717a364b016d34936eec81071c3c97fb1ea0 Mon Sep 17 00:00:00 2001 From: Charlie Gu Date: Wed, 20 May 2020 14:41:24 -0400 Subject: [PATCH] Adding exporting functionality to scheduled data docs --- .../datadoc_completion_notification.html | 16 ++ datahub/server/app/auth/google_auth.py | 13 +- datahub/server/app/db.py | 21 +- datahub/server/app/flask_app.py | 8 +- datahub/server/clients/google_client.py | 13 + datahub/server/const/data_doc.py | 3 +- datahub/server/const/schedule.py | 6 + datahub/server/datasources/datadoc.py | 99 ++++--- datahub/server/datasources/query_execution.py | 25 +- datahub/server/lib/export/all_exporters.py | 4 +- datahub/server/lib/export/base_exporter.py | 76 +++-- .../server/lib/export/exporters/__init__.py | 13 + .../lib/export/exporters/gspread_exporter.py | 228 +++++++++++++++ .../lib/export/exporters/python_exporter.py | 15 +- .../server/lib/export/exporters/r_exporter.py | 15 +- datahub/server/lib/form/__init__.py | 56 ++++ datahub/server/lib/notification.py | 13 +- datahub/server/logic/schedule.py | 24 +- datahub/server/logic/user.py | 36 +++ datahub/server/tasks/log_query_per_table.py | 4 +- datahub/server/tasks/run_datadoc.py | 209 ++++++++++++-- datahub/server/tasks/run_query.py | 2 +- .../__init__.py | 0 .../test_export/test_exporters/__init__.py | 0 .../test_exporters/test_gspread_exporter.py | 35 +++ datahub/tests/test_lib/test_form/__init__.py | 0 .../tests/test_lib/test_form/test__init__.py | 106 +++++++ .../test_lib/test_query_analysis/__init__.py | 0 .../test_lineage.py | 0 .../test_samples.py | 0 .../test_templating.py | 0 .../components/AppAdmin/AdminMetastore.tsx | 4 +- .../webapp/components/AppAdmin/AdminTask.tsx | 25 +- datahub/webapp/components/DataDoc/DataDoc.tsx | 6 - .../DataDocChartCell/DataDocChartComposer.tsx | 31 +-- .../DataDocRightSidebar.tsx | 8 - .../DataDocSchedule/DataDocSchedule.tsx | 48 ++-- .../DataDocSchedule/DataDocScheduleForm.tsx | 262 +++++++++++++++--- .../DataDocScheduleRunLogs.tsx | 2 +- .../ResultExportDropdown.tsx | 9 +- datahub/webapp/components/Task/TaskEditor.tsx | 4 +- datahub/webapp/const/schedule.ts | 22 ++ datahub/webapp/lib/result-export.ts | 41 +++ datahub/webapp/redux/dataDoc/selector.ts | 16 +- .../webapp/redux/queryExecutions/action.ts | 4 +- .../webapp/redux/queryExecutions/reducer.ts | 2 +- datahub/webapp/redux/queryExecutions/types.ts | 10 +- .../ui/DisabledSection/DisabledSection.tsx | 14 + datahub/webapp/ui/JsonViewer/JsonViewer.tsx | 2 +- .../SimpleReactSelect/SimpleReactSelect.tsx | 7 +- datahub/webapp/ui/SmartForm/formFunctions.ts | 5 +- docker-compose.yml | 2 +- requirements/base.txt | 10 +- 53 files changed, 1298 insertions(+), 276 deletions(-) create mode 100644 datahub/email_templates/datadoc_completion_notification.html create mode 100644 datahub/server/lib/export/exporters/gspread_exporter.py rename datahub/tests/test_lib/{query_analysis => test_export}/__init__.py (100%) create mode 100644 datahub/tests/test_lib/test_export/test_exporters/__init__.py create mode 100644 datahub/tests/test_lib/test_export/test_exporters/test_gspread_exporter.py create mode 100644 datahub/tests/test_lib/test_form/__init__.py create mode 100644 datahub/tests/test_lib/test_form/test__init__.py create mode 100644 datahub/tests/test_lib/test_query_analysis/__init__.py rename datahub/tests/test_lib/{query_analysis => test_query_analysis}/test_lineage.py (100%) rename datahub/tests/test_lib/{query_analysis => test_query_analysis}/test_samples.py (100%) rename datahub/tests/test_lib/{query_analysis => test_query_analysis}/test_templating.py (100%) create mode 100644 datahub/webapp/lib/result-export.ts create mode 100644 datahub/webapp/ui/DisabledSection/DisabledSection.tsx diff --git a/datahub/email_templates/datadoc_completion_notification.html b/datahub/email_templates/datadoc_completion_notification.html new file mode 100644 index 000000000..162d2ab04 --- /dev/null +++ b/datahub/email_templates/datadoc_completion_notification.html @@ -0,0 +1,16 @@ + + + + + + + +

+ {{ message }} +

+ + diff --git a/datahub/server/app/auth/google_auth.py b/datahub/server/app/auth/google_auth.py index e45cfa5ca..d34a6ee48 100644 --- a/datahub/server/app/auth/google_auth.py +++ b/datahub/server/app/auth/google_auth.py @@ -1,15 +1,12 @@ -import requests from app.auth.oauth_auth import OAuthLoginManager, OAUTH_CALLBACK_PATH from env import DataHubSettings - -GOOGLE_AUTH_CONFIG = "https://accounts.google.com/.well-known/openid-configuration" +from clients.google_client import get_google_oauth_config class GoogleLoginManager(OAuthLoginManager): @property def oauth_config(self): - if not hasattr(self, "_cached_google_config"): - self._cached_google_config = requests.get(GOOGLE_AUTH_CONFIG).json() + google_config = get_google_oauth_config() return { "callback_url": "{}{}".format( @@ -17,9 +14,9 @@ def oauth_config(self): ), "client_id": DataHubSettings.OAUTH_CLIENT_ID, "client_secret": DataHubSettings.OAUTH_CLIENT_SECRET, - "authorization_url": self._cached_google_config["authorization_endpoint"], - "token_url": self._cached_google_config["token_endpoint"], - "profile_url": self._cached_google_config["userinfo_endpoint"], + "authorization_url": google_config["authorization_endpoint"], + "token_url": google_config["token_endpoint"], + "profile_url": google_config["userinfo_endpoint"], "scope": [ "https://www.googleapis.com/auth/userinfo.email", "openid", diff --git a/datahub/server/app/db.py b/datahub/server/app/db.py index 7158eacf9..34721e614 100644 --- a/datahub/server/app/db.py +++ b/datahub/server/app/db.py @@ -77,22 +77,21 @@ def func(*args, **kwargs): session = get_session()() kwargs["session"] = session - try: - return fn(*args, **kwargs) - except SQLAlchemyError as e: - if session: + if session is not None: + try: + return fn(*args, **kwargs) + except SQLAlchemyError as e: session.rollback() - # TODO: Log the sqlalchemy error? import traceback LOG.error(traceback.format_exc()) - else: raise e - finally: - # If we created the session, close it. - if session: + finally: + # Since we created the session, close it. get_session().remove() + else: + return fn(*args, **kwargs) return func @@ -110,13 +109,13 @@ def DBSession(): session = get_session()() try: yield session - except SQLAlchemyError: + except SQLAlchemyError as e: session.rollback() - # TODO: Log the sqlalchemy error? import traceback LOG.error(traceback.format_exc()) + raise e finally: get_session().remove() diff --git a/datahub/server/app/flask_app.py b/datahub/server/app/flask_app.py index ca39387f8..695ea1e8e 100644 --- a/datahub/server/app/flask_app.py +++ b/datahub/server/app/flask_app.py @@ -1,7 +1,7 @@ import sys from celery import Celery -from flask import Flask, Blueprint, json as flask_json, has_app_context +from flask import Flask, Blueprint, json as flask_json, has_request_context from flask_socketio import SocketIO from flask_login import current_user from flask_limiter import Limiter @@ -77,9 +77,9 @@ class ContextTask(TaskBase): abstract = True def __call__(self, *args, **kwargs): - # If app context is already present then call the function - # app context is provided if the task run sychronously - if has_app_context(): + # If request context is already present then the celery task is called + # sychronously in a request, so no need to generate a new app context + if has_request_context(): return TaskBase.__call__(self, *args, **kwargs) # Otherwise in worker, we create the context and run with app.app_context(): diff --git a/datahub/server/clients/google_client.py b/datahub/server/clients/google_client.py index 883bb8a04..bc7c22ac2 100644 --- a/datahub/server/clients/google_client.py +++ b/datahub/server/clients/google_client.py @@ -3,6 +3,8 @@ from datetime import datetime from urllib.parse import quote +import requests + from env import DataHubSettings from .common import ChunkReader, FileDoesNotExist from lib.utils.utils import DATETIME_TO_UTC @@ -26,6 +28,17 @@ def get_google_credentials(creds_info=None): return credentials +GOOGLE_AUTH_CONFIG = "https://accounts.google.com/.well-known/openid-configuration" +_cached_google_oauth_config = None + + +def get_google_oauth_config(): + global _cached_google_oauth_config + if _cached_google_oauth_config is None: + _cached_google_oauth_config = requests.get(GOOGLE_AUTH_CONFIG).json() + return _cached_google_oauth_config + + # Reference used: https://dev.to/sethmlarson/python-data-streaming-to-google-cloud-storage-with-resumable-uploads-458h class GoogleUploadClient(object): def __init__( diff --git a/datahub/server/const/data_doc.py b/datahub/server/const/data_doc.py index 9e9dc66ee..03dfba0e2 100644 --- a/datahub/server/const/data_doc.py +++ b/datahub/server/const/data_doc.py @@ -1,8 +1,7 @@ from enum import Enum -# KEEP IT CONSISTENT AS config/datadoc.yaml - +# KEEP IT CONSISTENT AS config/datadoc.yaml class DataCellType(Enum): query = 0 text = 1 diff --git a/datahub/server/const/schedule.py b/datahub/server/const/schedule.py index cb073157c..e5146ea00 100644 --- a/datahub/server/const/schedule.py +++ b/datahub/server/const/schedule.py @@ -5,3 +5,9 @@ class TaskRunStatus(Enum): RUNNING = 0 SUCCESS = 1 FAILURE = 2 + + +class NotifyOn(Enum): + ALL = 0 + ON_FAILURE = 1 + ON_SUCCESS = 2 diff --git a/datahub/server/datasources/datadoc.py b/datahub/server/datasources/datadoc.py index 07d52d895..616d9088b 100644 --- a/datahub/server/datasources/datadoc.py +++ b/datahub/server/datasources/datadoc.py @@ -13,6 +13,8 @@ from env import DataHubSettings from lib.celery.cron import validate_cron +from lib.form import validate_form +from lib.export.all_exporters import get_exporter from lib.notification import simple_email, render_html from lib.logger import get_logger @@ -24,8 +26,8 @@ ) from logic.datadoc_permission import assert_can_read, assert_can_write from logic.query_execution import get_query_execution_by_id +from logic.schedule import run_and_log_scheduled_task from models.environment import Environment -from tasks.run_datadoc import run_datadoc LOG = get_logger(__file__) @@ -243,27 +245,6 @@ def delete_favorite_data_doc( logic.unfavorite_data_doc(data_doc_id=data_doc_id, uid=uid) -@register("/datadoc//run/", methods=["GET"]) -def get_datadoc_schedule_run(id): - with DBSession() as session: - assert_can_read(id, session=session) - verify_data_doc_permission(id, session=session) - - runs, _ = schedule_logic.get_task_run_record_run_by_name( - name=get_data_doc_schedule_name(id), session=session - ) - return runs - - -@register("/datadoc//run/", methods=["POST"]) -def run_data_doc(id): - with DBSession() as session: - assert_can_write(id, session=session) - verify_data_doc_permission(id, session=session) - - run_datadoc.apply_async(args=[id]) - - def get_data_doc_schedule_name(id: int): return f"run_data_doc_{id}" @@ -278,38 +259,61 @@ def get_datadoc_schedule(id): return schedule_logic.get_task_schedule_by_name(schedule_name, session=session) +def validate_datadoc_schedule_kwargs(kwargs): + allowed_keys = [ + "notify_with", + "notify_on", + "exporter_cell_id", + "exporter_name", + "exporter_params", + ] + for key in kwargs.keys(): + api_assert(key in allowed_keys, "Invalid field {}".format(key)) + if "exporter_name" in kwargs: + exporter_name = kwargs["exporter_name"] + exporter = get_exporter(exporter_name) + api_assert(exporter is not None, "Invalid exporter {}".format(exporter_name)) + + exporter_params = kwargs.get("exporter_params", {}) + exporter_form = exporter.export_form + if not (exporter_form is None and not exporter_params): + valid, reason = validate_form(exporter_form, exporter_params) + api_assert(valid, "Invalid exporter params, reason: " + reason) + + @register("/datadoc//schedule/", methods=["POST"]) def create_datadoc_schedule( - id, cron, + id, cron, kwargs, ): - schedule_name = get_data_doc_schedule_name(id) + validate_datadoc_schedule_kwargs(kwargs) + api_assert(validate_cron(cron), "Invalid cron expression") + schedule_name = get_data_doc_schedule_name(id) with DBSession() as session: assert_can_write(id, session=session) data_doc = logic.get_data_doc_by_id(id, session=session) verify_environment_permission([data_doc.environment_id]) - api_assert(validate_cron(cron), "Invalid cron expression") - return schedule_logic.create_task_schedule( schedule_name, "tasks.run_datadoc.run_datadoc", cron=cron, - kwargs={"doc_id": id}, + kwargs={**kwargs, "user_id": current_user.id, "doc_id": id}, task_type="user", session=session, ) @register("/datadoc//schedule/", methods=["PUT"]) -def update_datadoc_schedule( - id, cron=None, enabled=None, -): +def update_datadoc_schedule(id, cron=None, enabled=None, kwargs=None): + if kwargs is not None: + validate_datadoc_schedule_kwargs(kwargs) + if cron is not None: + api_assert(validate_cron(cron), "Invalid cron expression") + schedule_name = get_data_doc_schedule_name(id) with DBSession() as session: assert_can_write(id, session=session) - if cron is not None: - api_assert(validate_cron(cron), "Invalid cron expression") schedule = schedule_logic.get_task_schedule_by_name( schedule_name, session=session @@ -322,6 +326,12 @@ def update_datadoc_schedule( updated_fields["cron"] = cron if enabled is not None: updated_fields["enabled"] = enabled + if kwargs is not None: + updated_fields["kwargs"] = { + **kwargs, + "user_id": current_user.id, + "doc_id": id, + } return schedule_logic.update_task_schedule( schedule.id, session=session, **updated_fields, @@ -342,6 +352,31 @@ def delete_datadoc_schedule(id): schedule_logic.delete_task_schedule(schedule.id, session=session) +@register("/datadoc//schedule/logs/", methods=["GET"]) +def get_datadoc_schedule_run(id): + with DBSession() as session: + assert_can_read(id, session=session) + verify_data_doc_permission(id, session=session) + + runs, _ = schedule_logic.get_task_run_record_run_by_name( + name=get_data_doc_schedule_name(id), session=session + ) + return runs + + +@register("/datadoc//schedule/run/", methods=["POST"]) +def run_data_doc(id): + schedule_name = get_data_doc_schedule_name(id) + with DBSession() as session: + assert_can_write(id, session=session) + verify_data_doc_permission(id, session=session) + schedule = schedule_logic.get_task_schedule_by_name( + schedule_name, session=session + ) + api_assert(schedule, "Schedule does not exist") + run_and_log_scheduled_task(schedule.id, session=session) + + @register("/datadoc//editor/", methods=["GET"]) def get_datadoc_editors(doc_id): return logic.get_data_doc_editors_by_doc_id(doc_id) diff --git a/datahub/server/datasources/query_execution.py b/datahub/server/datasources/query_execution.py index 5563ccd83..419d2637f 100644 --- a/datahub/server/datasources/query_execution.py +++ b/datahub/server/datasources/query_execution.py @@ -16,7 +16,7 @@ verify_query_engine_permission, ) from clients.s3_client import FileDoesNotExist -from lib.export.all_exporters import ALL_EXPORTERS, get_exporter_class +from lib.export.all_exporters import ALL_EXPORTERS, get_exporter from lib.result_store import GenericReader from lib.query_analysis.templating import ( render_templated_query, @@ -344,13 +344,24 @@ def delete_query_execution_notification( ) -@register("/statement_execution_exporter/", methods=["GET"], require_auth=True) -def get_all_statement_execution_exporters(): +@register("/query_execution_exporter/", methods=["GET"], require_auth=True) +def get_all_query_result_exporters(): return ALL_EXPORTERS @register( - "/statement_execution//export/", + "/query_execution_exporter/auth/", methods=["GET"], +) +def export_statement_execution_acquire_auth(export_name): + exporter = get_exporter(export_name) + api_assert(exporter is not None, f"Invalid export name {export_name}") + if not exporter.requires_auth: + return None + return exporter.acquire_auth(current_user.id) + + +@register( + "/query_execution_exporter/statement_execution//", methods=["GET"], require_auth=True, ) @@ -366,9 +377,9 @@ def export_statement_execution_result(statement_execution_id, export_name): statement_execution.query_execution_id, session=session ) - exporter_class = get_exporter_class(export_name) - api_assert(exporter_class is not None, f"Invalid export name {export_name}") - return exporter_class.export(statement_execution_id, current_user.id) + exporter = get_exporter(export_name) + api_assert(exporter is not None, f"Invalid export name {export_name}") + return exporter.export(statement_execution_id, current_user.id) @register("/query_execution/templated_query/", methods=["POST"], require_auth=True) diff --git a/datahub/server/lib/export/all_exporters.py b/datahub/server/lib/export/all_exporters.py index b76500574..3e5c54161 100644 --- a/datahub/server/lib/export/all_exporters.py +++ b/datahub/server/lib/export/all_exporters.py @@ -6,8 +6,8 @@ ALL_EXPORTERS = ALL_PLUGIN_EXPORTERS -def get_exporter_class(name: str): +def get_exporter(name: str): for exporter in ALL_EXPORTERS: - if exporter.EXPORTER_NAME() == name: + if exporter.exporter_name == name: return exporter raise ValueError(f"Unknown exporter name {name}") diff --git a/datahub/server/lib/export/base_exporter.py b/datahub/server/lib/export/base_exporter.py index 4c8249b8a..fe51cec1d 100644 --- a/datahub/server/lib/export/base_exporter.py +++ b/datahub/server/lib/export/base_exporter.py @@ -1,41 +1,74 @@ -from abc import ABCMeta, abstractclassmethod +from abc import ABCMeta, abstractmethod from logic import query_execution as logic from lib.result_store import GenericReader class BaseExporter(metaclass=ABCMeta): - @abstractclassmethod - def EXPORTER_NAME(cls) -> str: + @property + @abstractmethod + def exporter_name(self) -> str: """Name of the exporter that will be shown on the frontend """ raise NotImplementedError() - @abstractclassmethod - def EXPORTER_TYPE(cls): - # Can be one of 'url' or 'text' - # Both returns a string for upload but + @property + @abstractmethod + def exporter_type(self) -> str: + # Can be one of 'url' | 'text' | 'none' # Url exports returns a url for user to open # Text exports opens up a copy paste modal for user to copy + # None returns nothing since the result is exported without anything to track raise NotImplementedError() - @abstractclassmethod - def export(cls, statement_execution_id: int, uid: int) -> str: + @property + def requires_auth(self) -> bool: + # Make this method return true if additional auth flow is needed for it to work + return False + + def acquire_auth(self, uid: int) -> str: + """Implement this method if requires_auth is True + Use this method to redirect user to the oauth url + + Arguments: + uid {int} -- [description] + Returns: + str -- Redirection url to the google oauth + None -- if no redirection is needed + """ + raise NotImplementedError() + + @property + def export_form(self): + """Return the form field for additional options for export + Note that all options to be optional. + Returns None if nothing is to be filled + + Returns: + StructFormField -- The form value that indicates + the key value to enter + """ + return None + + @abstractmethod + def export(self, statement_execution_id: int, uid: int, **options) -> str: """This function exports the query results of statement_execution_id to given output Arguments: statement_execution_id {[number]} uid {[number]} -- user who requested access + options {[Dict]} -- optional additional options, note they must be optional + since Returns: str -- String for frontend to display Behavior noted by EXPORTER_TYPE """ raise NotImplementedError() - @classmethod - def get_statement_execution_result( - cls, + def _get_statement_execution_result( + self, statement_execution_id: int, raw: bool = False, # If raw, return unparsed csv text + number_of_lines: int = 2001, ): statement_execution = logic.get_statement_execution_by_id( statement_execution_id @@ -43,14 +76,15 @@ def get_statement_execution_result( if statement_execution.result_path: with GenericReader(statement_execution.result_path) as reader: if raw: - result = "\n".join(reader.read_lines(number_of_lines=2001)) + result = "\n".join( + reader.read_lines(number_of_lines=number_of_lines) + ) else: - result = reader.read_csv(number_of_lines=2001) + result = reader.read_csv(number_of_lines=number_of_lines) return result return None - @classmethod - def get_statement_execution_download_url(cls, statement_execution_id: int): + def _get_statement_execution_download_url(self, statement_execution_id: int): statement_execution = logic.get_statement_execution_by_id( statement_execution_id ) @@ -60,6 +94,10 @@ def get_statement_execution_download_url(cls, statement_execution_id: int): return reader.get_download_url() return None - @classmethod - def to_dict(cls): - return {"name": cls.EXPORTER_NAME(), "type": cls.EXPORTER_TYPE()} + def to_dict(self): + return { + "name": self.exporter_name, + "type": self.exporter_type, + "requires_auth": self.requires_auth, + "form": self.export_form, + } diff --git a/datahub/server/lib/export/exporters/__init__.py b/datahub/server/lib/export/exporters/__init__.py index e69de29bb..ef955c74f 100644 --- a/datahub/server/lib/export/exporters/__init__.py +++ b/datahub/server/lib/export/exporters/__init__.py @@ -0,0 +1,13 @@ +# Copyright 2020 Pinterest, Inc +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. diff --git a/datahub/server/lib/export/exporters/gspread_exporter.py b/datahub/server/lib/export/exporters/gspread_exporter.py new file mode 100644 index 000000000..eba8fa2d0 --- /dev/null +++ b/datahub/server/lib/export/exporters/gspread_exporter.py @@ -0,0 +1,228 @@ +# Copyright 2020 Pinterest, Inc +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import datetime +import re +from typing import Tuple + +from flask import request +from flask_login import current_user +from google_auth_oauthlib.flow import Flow +from google.oauth2.credentials import Credentials +from google.auth.exceptions import RefreshError +import gspread + +# import requests + +from app.flask_app import flask_app +from env import DataHubSettings +from logic.user import get_user_by_id, update_user_properties +from lib.export.base_exporter import BaseExporter +from lib.form import StructFormField, FormField + + +class UserTokenNotFound(Exception): + pass + + +SCOPES = [ + "https://www.googleapis.com/auth/drive", + "https://www.googleapis.com/auth/spreadsheets", +] + +GSPREAD_OAUTH_CALLBACK = "/gspread_oauth2callback" + + +_google_flow = None + + +def create_google_flow(google_client_config): + global _google_flow + if _google_flow is None: + _google_flow = Flow.from_client_config( + google_client_config, + scopes=SCOPES, + redirect_uri="{}{}".format( + DataHubSettings.PUBLIC_URL, GSPREAD_OAUTH_CALLBACK + ), + ) + + +@flask_app.route(GSPREAD_OAUTH_CALLBACK) +def gspread_oauth_call_back(): + try: + code = request.args.get("code") + token = _google_flow.fetch_token(code=code) + update_user_properties(current_user.id, gspread_token=token) + + except Exception as e: + return """ + Failed to obtain credentials, reason: {} + """.format( + str(e) + ) + + return """ +

Success! Please close the tab.

+ + """ + + +class GoogleSheetsExporter(BaseExporter): + def __init__(self, google_client_config): + super(GoogleSheetsExporter, self).__init__() + self._google_client_config = google_client_config + create_google_flow(google_client_config) + + @property + def exporter_name(self): + return "Export Preview to Google Sheets" + + @property + def exporter_type(self): + return "url" + + @property + def requires_auth(self): + return True + + @property + def export_form(self): + return StructFormField( + sheet_url=FormField( + description="Optional, if not provided a new sheet will be created." + ), + worksheet_title=FormField( + description='Defaults to "Sheet1"', + helper="Title of the worksheet, if not found then a sheet will be created", + ), + start_cell=FormField( + description="The top left cell position where data will be filled. Defaults to A1", + regex="^[A-Z]{1,3}[1-9][0-9]*$", + ), + ) + + def export( + self, + statement_execution_id, + uid, + sheet_url=None, + worksheet_title="Sheet1", + start_cell="A1", + ): + try: + credentials = self.get_credentials(uid) + gc = gspread.authorize(credentials) + sheet = ( + gc.create(f"DataHub Result {statement_execution_id}") + if sheet_url is None + else gc.open_by_url(sheet_url) + ) + # Default case where only upload is needed + if sheet_url is None and worksheet_title == "Sheet1" and start_cell == "A1": + raw_csv = self._get_statement_execution_result( + statement_execution_id, raw=True + ) + gc.import_csv(sheet.id, raw_csv) + else: + csv = self._get_statement_execution_result(statement_execution_id) + + if len(csv): + num_rows = len(csv) + num_cols = len(csv[0]) + + worksheet = None + try: + worksheet = sheet.worksheet(worksheet_title) + except gspread.exceptions.WorksheetNotFound: + worksheet = sheet.add_worksheet( + worksheet_title, max(num_rows, 1000), max(num_cols, 26) + ) + + if len(csv): + start_cell_coord = worksheet_coord_to_coord(start_cell) + end_cell = coord_to_worksheet_coord( + start_cell_coord[0] + num_cols, + start_cell_coord[1] + num_rows, + ) + worksheet.update("{}:{}".format(start_cell, end_cell), csv) + + return f"https://docs.google.com/spreadsheets/d/{sheet.id}" + except RefreshError: + # Invalidate user access token + update_user_properties(current_user.id, gspread_token=None) + # Continue to raise the error for the frontend client to see + raise Exception("Invalid Google credentials, please try again.") + + def get_credentials(self, uid): + user = get_user_by_id(uid) + if not (user and "gspread_token" in user.properties): + raise UserTokenNotFound() + token = user.properties["gspread_token"] + + client_config = _google_flow.client_config + credentials = Credentials( + token["access_token"], + refresh_token=token.get("refresh_token"), + id_token=token.get("id_token"), + token_uri=client_config.get("token_uri"), + client_id=client_config.get("client_id"), + client_secret=client_config.get("client_secret"), + scopes=SCOPES, + ) + + credentials.expiry = datetime.datetime.utcfromtimestamp(token["expires_at"]) + + return credentials + + def acquire_auth(self, uid: int): + try: + self.get_credentials(uid) + return None + except UserTokenNotFound: + # If user token is not found, go through the oauth process + auth_url, _ = _google_flow.authorization_url( + access_type="offline", prompt="consent" + ) + return auth_url + + +ordAMinusOne = ord("A") - 1 + + +def worksheet_coord_to_coord(worksheet_coord: str) -> Tuple[int, int]: + match = re.match(r"^([A-Za-z]+)([1-9][0-9]*)$", worksheet_coord) + col = match.group(1).upper() + row = match.group(2) + + num_row = int(row) + + num_col = 0 + for i, ch in enumerate(reversed(col)): + num_col += (ord(ch) - ordAMinusOne) * (26 ** i) + return num_col, num_row + + +def coord_to_worksheet_coord(col: int, row: int) -> str: + str_row = str(row) + str_col = "" + while col > 0: + col, remainder = divmod(col, 26) + if remainder == 0: + remainder = 26 + col -= 1 + str_col = chr(ordAMinusOne + remainder) + str_col + return str_col + str_row diff --git a/datahub/server/lib/export/exporters/python_exporter.py b/datahub/server/lib/export/exporters/python_exporter.py index adfe040e8..b25f63cf0 100644 --- a/datahub/server/lib/export/exporters/python_exporter.py +++ b/datahub/server/lib/export/exporters/python_exporter.py @@ -2,17 +2,18 @@ class PythonExporter(BaseExporter): - @classmethod - def EXPORTER_NAME(cls): + @property + def exporter_name(self): return "Export to Python" - @classmethod - def EXPORTER_TYPE(cls): + @property + def exporter_type(self): return "text" - @classmethod - def export(cls, statement_execution_id, uid): - download_url = cls.get_statement_execution_download_url(statement_execution_id) + def export(self, statement_execution_id, uid): + download_url = self._get_statement_execution_download_url( + statement_execution_id + ) return """ url = "{}" diff --git a/datahub/server/lib/export/exporters/r_exporter.py b/datahub/server/lib/export/exporters/r_exporter.py index 19123694a..4e463acaa 100644 --- a/datahub/server/lib/export/exporters/r_exporter.py +++ b/datahub/server/lib/export/exporters/r_exporter.py @@ -2,17 +2,18 @@ class RExporter(BaseExporter): - @classmethod - def EXPORTER_NAME(cls): + @property + def exporter_name(self): return "Export to R" - @classmethod - def EXPORTER_TYPE(cls): + @property + def exporter_type(self): return "text" - @classmethod - def export(cls, statement_execution_id, uid): - download_url = cls.get_statement_execution_download_url(statement_execution_id) + def export(self, statement_execution_id, uid): + download_url = self._get_statement_execution_download_url( + statement_execution_id + ) return """ library(tidyverse) diff --git a/datahub/server/lib/form/__init__.py b/datahub/server/lib/form/__init__.py index fd0f5154d..a921a0edc 100644 --- a/datahub/server/lib/form/__init__.py +++ b/datahub/server/lib/form/__init__.py @@ -1,3 +1,4 @@ +import re from abc import ABCMeta, abstractmethod from typing import Dict, Union from enum import Enum @@ -96,3 +97,58 @@ def to_dict(self): AllFormField = Union[FormField, ExpandableFormField, StructFormField] + + +def validate_form(form: AllFormField, form_value) -> [bool, str]: + """Checks if the form is valid + + Arguments: + form {AllFormField} -- The form structure + form_value {Any} -- The corresponding form value + + Returns: + [bool, str] -- True if valid otherwise False, and the reason why it's invalid + """ + if isinstance(form, StructFormField): + if not isinstance(form_value, dict): + return False, "Field value is not a dictionary" + for key, subform in form.kwargs.items(): + valid, reason = validate_form(subform, form_value.get(key, None)) + if not valid: + return valid, reason + return True, "" + elif isinstance(form, ExpandableFormField): + if not isinstance(form_value, list): + return False, "Field value is not an array" + if form.min is not None and len(form_value) < form.min: + return False, "Field value less than allowed length" + if form.max is not None and len(form_value) > form.max: + return False, "Field value more than allowed length" + for child_form_value in form_value: + valid, reason = validate_form(form.of, child_form_value) + if not valid: + return valid, reason + return True, "" + + elif isinstance(form, FormField): + if form_value is None: + if form.required: + return False, "Required field is missing" + return True, "" + + if form.field_type == FormFieldType.String: + if not isinstance(form_value, str): + return False, "Field value is not a string" + if form.regex is not None: + if not re.match(form.regex, form_value): + return False, "Field value does not match regex" + return True, "" + elif form.field_type == FormFieldType.Number: + if not isinstance(form_value, (int, float)): + return False, "Field value is not a number" + return True, "" + elif form.field_type == FormFieldType.Boolean: + if not isinstance(form_value, bool): + return False, "Field value is not a boolean" + return True, "" + return False, "Unexpected form type" diff --git a/datahub/server/lib/notification.py b/datahub/server/lib/notification.py index 58337f28f..f9ab2fed7 100644 --- a/datahub/server/lib/notification.py +++ b/datahub/server/lib/notification.py @@ -14,10 +14,15 @@ def send_slack_message(to, message, token=DataHubSettings.DATAHUB_SLACK_TOKEN): """Send Message to slack user/channel - Keyword arguments: - token: the api token - to: the user or channel message is going to - message: the actual message in raw text + Arguments: + to {str} -- the user or channel message is going to + message {str} -- the actual message in raw text + + Keyword Arguments: + token {str} -- the api token (default: {DataHubSettings.DATAHUB_SLACK_TOKEN}) + + Returns: + [requests.Response] -- The slack post request response """ url = "https://slack.com/api/chat.postMessage" diff --git a/datahub/server/logic/schedule.py b/datahub/server/logic/schedule.py index 9a7acc4b2..1fc97f766 100644 --- a/datahub/server/logic/schedule.py +++ b/datahub/server/logic/schedule.py @@ -177,6 +177,12 @@ def update_task_run_record(id, status=None, alerted=None, session=None): return run +@with_session +def create_task_run_record_for_celery_task(task, session=None): + job_name = task.request.get("shadow", task.name) + return create_task_run_record(name=job_name, session=session).id + + def with_task_logging( # TODO: add some alerting feature here ): @@ -187,25 +193,17 @@ def base_job_decorator(job_func): @wraps(job_func) def wrapper(self, *args, **kwargs): - record_dict = None + record_id = None try: - job_name = self.request.get("shadow", self.name) - - record_dict = create_task_run_record(name=job_name).to_dict() - + record_id = create_task_run_record_for_celery_task(self) result = job_func(self, *args, **kwargs) - - update_task_run_record( - id=record_dict["id"], status=TaskRunStatus.SUCCESS - ) + update_task_run_record(id=record_id, status=TaskRunStatus.SUCCESS) return result except Exception as e: logger.info(e) - if isinstance(record_dict, dict): - update_task_run_record( - id=record_dict.get("id"), status=TaskRunStatus.FAILURE - ) + if record_id is not None: + update_task_run_record(id=record_id, status=TaskRunStatus.FAILURE) raise e return wrapper diff --git a/datahub/server/logic/user.py b/datahub/server/logic/user.py index 2511d7637..f09a37b3d 100644 --- a/datahub/server/logic/user.py +++ b/datahub/server/logic/user.py @@ -84,6 +84,42 @@ def update_user(uid, commit=True, session=None, **kwargs): ) +@with_session +def update_user_properties(uid: int, commit=True, session=None, **properties): + """Update the properties field of user. Key value pairs of properties + will be updated to user, if properties value is None, then that property + will be deleted. + + Arguments: + uid {int} -- The user Id + + Keyword Arguments: + commit {bool} -- Whether or not to commit the change (default: {True}) + session -- Sqlalchemy session, auto provided (default: {None}) + + Returns: + User -- The updated user object + """ + + user = get_user_by_id(uid, session=session) + assert user is not None + new_properties = {**user.properties} + for key, value in properties.items(): + if value is None: + new_properties.pop(key, None) + else: + new_properties[key] = value + user.properties = new_properties + + if commit: + session.commit() + else: + session.flush() + session.refresh(user) + + return user + + @with_session def delete_user(uid, session=None): # user cannot be deleted diff --git a/datahub/server/tasks/log_query_per_table.py b/datahub/server/tasks/log_query_per_table.py index b0f7a06a8..12ca82ab7 100644 --- a/datahub/server/tasks/log_query_per_table.py +++ b/datahub/server/tasks/log_query_per_table.py @@ -13,8 +13,8 @@ ) -@celery.task -def log_query_per_table_task(query_execution_id): +@celery.task(bind=True) +def log_query_per_table_task(self, query_execution_id): with DBSession() as session: query_execution = qe_logic.get_query_execution_by_id( query_execution_id, session=session diff --git a/datahub/server/tasks/run_datadoc.py b/datahub/server/tasks/run_datadoc.py index a07f3fb11..f0753148e 100644 --- a/datahub/server/tasks/run_datadoc.py +++ b/datahub/server/tasks/run_datadoc.py @@ -3,21 +3,46 @@ from app.flask_app import celery from app.db import DBSession -from lib.query_analysis.templating import render_templated_query from const.data_doc import DataCellType +from const.schedule import NotifyOn, TaskRunStatus from const.query_execution import QueryExecutionStatus +from env import DataHubSettings +from lib.export.all_exporters import get_exporter +from lib.notification import simple_email, send_slack_message, render_html +from lib.query_analysis.templating import render_templated_query +from lib.logger import get_logger - -from logic.schedule import with_task_logging +from logic.schedule import ( + create_task_run_record_for_celery_task, + update_task_run_record, +) from logic import datadoc as datadoc_logic from logic import query_execution as qe_logic +from models.user import User from tasks.run_query import run_query_task +LOG = get_logger(__file__) +GENERIC_QUERY_FAILURE_MSG = "Execution did not finish successfully, workflow failed" + @celery.task(bind=True) -@with_task_logging() -def run_datadoc(self, doc_id, user_id=None, *args, **kwargs): +def run_datadoc( + self, + doc_id, + user_id=None, + # Notification related settings + notify_with=None, + notify_on=NotifyOn.ALL.value, + # Exporting related settings + exporter_cell_id=None, + exporter_name=None, + exporter_params={}, + *args, + **kwargs, +): tasks_to_run = [] + record_id = None + with DBSession() as session: data_doc = datadoc_logic.get_data_doc_by_id(doc_id, session=session) if not data_doc: @@ -27,30 +52,53 @@ def run_datadoc(self, doc_id, user_id=None, *args, **kwargs): query_cells = [ cell for cell in data_doc.cells if cell.cell_type == DataCellType.query ] + if exporter_cell_id is not None and not any( + cell.id == exporter_cell_id for cell in query_cells + ): + raise Exception("Invalid cell id for exporting") + + # Preping chain jobs each unit is a [make_qe_task, run_query_task] combo for index, query_cell in enumerate(query_cells): query = render_templated_query(query_cell.context, data_doc.meta) + make_query_execution_kwargs = { + "query": query, + "engine_id": query_cell.meta["engine"], + "cell_id": query_cell.id, + "uid": runner_id, + } + tasks_to_run.append( - make_query_execution_task.si( + _make_query_execution_task.si( prev_query_status=QueryExecutionStatus.DONE.value, - query=query, - engine_id=query_cell.meta["engine"], - cell_id=query_cell.id, - uid=runner_id, + **make_query_execution_kwargs, ) if index == 0 - else make_query_execution_task.s( - query=query, - engine_id=query_cell.meta["engine"], - cell_id=query_cell.id, - uid=runner_id, - ) + else _make_query_execution_task.s(**make_query_execution_kwargs) ) tasks_to_run.append(run_query_task.s()) - chain(*tasks_to_run).apply_async() + + # Create db entry record + record_id = create_task_run_record_for_celery_task(self, session=session) + + completion_task_kwargs = { + "doc_id": doc_id, + "user_id": user_id, + "record_id": record_id, + "notify_with": notify_with, + "notify_on": notify_on, + "exporter_name": exporter_name, + "exporter_params": exporter_params, + "exporter_cell_id": exporter_cell_id, + } + + chain(*tasks_to_run).apply_async( + link=on_datadoc_run_success.s(**completion_task_kwargs), + link_error=on_datadoc_run_failure.s(**completion_task_kwargs), + ) @celery.task(bind=True) -def make_query_execution_task( +def _make_query_execution_task( self, prev_query_status, query, engine_id, cell_id, uid, ): if prev_query_status == QueryExecutionStatus.DONE.value: @@ -64,4 +112,127 @@ def make_query_execution_task( ) return query_execution_id else: - raise Exception("Last execution did not finish successfully") + raise Exception(GENERIC_QUERY_FAILURE_MSG) + + +@celery.task +def on_datadoc_run_success( + last_query_status, **kwargs, +): + is_success = last_query_status == QueryExecutionStatus.DONE.value + error_msg = None if is_success else GENERIC_QUERY_FAILURE_MSG + return on_datadoc_completion(is_success=is_success, error_msg=error_msg, **kwargs) + + +@celery.task +def on_datadoc_run_failure( + request, exc, traceback, **kwargs, +): + error_msg = "DataDoc failed to run. Task {0!r} raised error: {1!r}".format( + request.id, exc + ) + return on_datadoc_completion(is_success=False, error_msg=error_msg, **kwargs) + + +def on_datadoc_completion( + doc_id, + user_id, + record_id, + # Notification settings + notify_with, + notify_on, + # Export settings + exporter_cell_id, + exporter_name, + exporter_params, + # Success/Failure handling + is_success, + error_msg=None, +): + try: + update_task_run_record( + id=record_id, + status=TaskRunStatus.SUCCESS if is_success else TaskRunStatus.FAILURE, + ) + + # Export query results + export_url = None + if is_success and exporter_cell_id is not None: + statement_execution_id = None + with DBSession() as session: + cell = datadoc_logic.get_data_cell_by_id( + exporter_cell_id, session=session + ) + assert cell and len(cell.query_executions) > 0 + query_execution = cell.query_executions[0] + statement_execution_id = query_execution.statement_executions[-1].id + if statement_execution_id is not None: + exporter = get_exporter(exporter_name) + export_url = exporter.export( + statement_execution_id, user_id, **(exporter_params or {}) + ) + + # Send user Notification + should_notify = notify_on == NotifyOn.ALL.value or ( + (notify_on == NotifyOn.ON_SUCCESS.value and is_success) + or (notify_on == NotifyOn.ON_FAILURE.value and not is_success) + ) + if should_notify and notify_with is not None: + msg_header, msg_content = create_completion_message( + doc_id, + is_success, + export_url, + use_html=(notify_with == "email"), + error_msg=error_msg, + ) + user = User.get(id=user_id) + if notify_with == "slack": + send_slack_message( + to=f"@{user.username}", + message="{}\n{}".format(msg_header, msg_content), + ) + elif notify_with == "email": + html = render_html( + "datadoc_completion_notification.html", dict(message=msg_content), + ) + simple_email( + msg_header, html, to_email=user.email, + ) + except Exception as e: + is_success = False + # error_msg = str(e) + LOG.error(e, exc_info=True) + + return is_success + + +def create_completion_message( + doc_id, is_success, export_url, use_html=False, error_msg=None +): + def htmlize(url: str): + if use_html and url: + return '{url}'.format(url=url) + return url + + export_url = htmlize(export_url) + + doc_title = None + doc_url = None + with DBSession() as session: + datadoc = datadoc_logic.get_data_doc_by_id(doc_id, session=session) + doc_title = datadoc.title or "Untitled" + env_name = datadoc.environment.name + doc_url = htmlize( + "{}".format(f"{DataHubSettings.PUBLIC_URL}/{env_name}/datadoc/{doc_id}/") + ) + + header = "{status}: {doc_title} has finished!".format( + status="Success" if is_success else "Failure", doc_title=doc_title + ) + content = "Here is the url to the doc: {}.".format(doc_url) + if export_url is not None: + content += " Here is the exported query result url: {}.".format(export_url) + elif not is_success: + content += " The failure reason: {}.".format(error_msg or "") + + return header, content diff --git a/datahub/server/tasks/run_query.py b/datahub/server/tasks/run_query.py index 9ea9a8286..d8533253f 100644 --- a/datahub/server/tasks/run_query.py +++ b/datahub/server/tasks/run_query.py @@ -154,7 +154,7 @@ def assert_safe_query(query, engine_id, session=None): try: from lib.metastore.utils import MetastoreTableACLChecker - LOG.info("assert_safe_query") + LOG.debug("assert_safe_query") table_per_statement, _ = process_query(query) all_tables = [table for tables in table_per_statement for table in tables] diff --git a/datahub/tests/test_lib/query_analysis/__init__.py b/datahub/tests/test_lib/test_export/__init__.py similarity index 100% rename from datahub/tests/test_lib/query_analysis/__init__.py rename to datahub/tests/test_lib/test_export/__init__.py diff --git a/datahub/tests/test_lib/test_export/test_exporters/__init__.py b/datahub/tests/test_lib/test_export/test_exporters/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/datahub/tests/test_lib/test_export/test_exporters/test_gspread_exporter.py b/datahub/tests/test_lib/test_export/test_exporters/test_gspread_exporter.py new file mode 100644 index 000000000..17b02a451 --- /dev/null +++ b/datahub/tests/test_lib/test_export/test_exporters/test_gspread_exporter.py @@ -0,0 +1,35 @@ +from unittest import TestCase + +from lib.export.exporters.gspread_exporter import ( + worksheet_coord_to_coord, + coord_to_worksheet_coord, +) + + +class WorksheetCoordToCoordTestCase(TestCase): + def test_simple(self): + self.assertEqual(worksheet_coord_to_coord("A123"), (1, 123)) + self.assertEqual(worksheet_coord_to_coord("AA123"), (27, 123)) + self.assertEqual(worksheet_coord_to_coord("BA123"), (53, 123)) + + def test_equivalent(self): + cases = ["A123", "AA1234", "BA321", "DFA123", "AAA567", "ABC123"] + for case in cases: + self.assertEqual( + coord_to_worksheet_coord(*worksheet_coord_to_coord(case)), case + ) + + +class CoordToWorksheetCoordTestCase(TestCase): + def test_simple(self): + self.assertEqual(coord_to_worksheet_coord(1, 1), "A1") + self.assertEqual(coord_to_worksheet_coord(30, 20), "AD20") + self.assertEqual(coord_to_worksheet_coord(52, 321), "AZ321") + + def test_equivalent(self): + cases = range(1, 500, 7) + for case in cases: + self.assertEqual( + worksheet_coord_to_coord(coord_to_worksheet_coord(case, 123)), + (case, 123), + ) diff --git a/datahub/tests/test_lib/test_form/__init__.py b/datahub/tests/test_lib/test_form/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/datahub/tests/test_lib/test_form/test__init__.py b/datahub/tests/test_lib/test_form/test__init__.py new file mode 100644 index 000000000..66a46080c --- /dev/null +++ b/datahub/tests/test_lib/test_form/test__init__.py @@ -0,0 +1,106 @@ +from unittest import TestCase +from lib.form import ( + validate_form, + FormField, + ExpandableFormField, + StructFormField, + FormFieldType, +) + + +class ValidateFormTestCase(TestCase): + def test_null_field(self): + # Code coverage test cases + + # Null case test + self.assertEqual( + validate_form(FormField(required=True), None), + (False, "Required field is missing"), + ) + self.assertEqual(validate_form(FormField(), None), (True, "")) + + def test_unknown_field(self): + self.assertEqual(validate_form(None, None), (False, "Unexpected form type")) + + def test_string_field(self): + # String Tests + self.assertEqual( + validate_form(FormField(), 123), (False, "Field value is not a string") + ) + self.assertEqual(validate_form(FormField(), "123"), (True, "")) + + self.assertEqual( + validate_form(FormField(regex="^[a-z]+$"), "datahub2"), + (False, "Field value does not match regex"), + ) + self.assertEqual( + validate_form(FormField(regex="^[a-z]+$"), "datahub"), (True, "") + ) + + def test_number_field(self): + self.assertEqual( + validate_form(FormField(field_type=FormFieldType.Number), "123"), + (False, "Field value is not a number"), + ) + self.assertEqual( + validate_form(FormField(field_type=FormFieldType.Number), 123), (True, "") + ) + self.assertEqual( + validate_form(FormField(field_type=FormFieldType.Number), 123.123), + (True, ""), + ) + + def test_bool_field(self): + self.assertEqual( + validate_form(FormField(field_type=FormFieldType.Boolean), "123"), + (False, "Field value is not a boolean"), + ) + self.assertEqual( + validate_form(FormField(field_type=FormFieldType.Boolean), 123), + (False, "Field value is not a boolean"), + ) + self.assertEqual( + validate_form(FormField(field_type=FormFieldType.Boolean), True), (True, "") + ) + + def test_array_field(self): + form = ExpandableFormField(of=FormField(), min=2, max=4) + self.assertEqual( + validate_form(form, "123"), (False, "Field value is not an array"), + ) + self.assertEqual( + validate_form(form, ["123"]), + (False, "Field value less than allowed length"), + ) + self.assertEqual( + validate_form(form, ["123"] * 5), + (False, "Field value more than allowed length"), + ) + self.assertEqual( + validate_form(form, ["123", "123", 123]), + (False, "Field value is not a string"), + ) + self.assertEqual(validate_form(form, ["123", "456", "789"]), (True, "")) + + def test_dict_field(self): + form = StructFormField( + name=FormField(), + phone_numbers=ExpandableFormField(of=FormField(), min=1, max=2), + ) + self.assertEqual( + validate_form(form, "123"), (False, "Field value is not a dictionary"), + ) + self.assertEqual( + validate_form(form, {"phone_numbers": [1234], "name": "bob"}), + (False, "Field value is not a string"), + ) + self.assertEqual( + validate_form(form, {"phone_numbers": ["1234"] * 3, "name": "bob"}), + (False, "Field value more than allowed length"), + ) + self.assertEqual( + validate_form(form, {"phone_numbers": ["1234"], "name": "bob"}), (True, ""), + ) + self.assertEqual( + validate_form(form, {"phone_numbers": ["1234"],}), (True, ""), + ) diff --git a/datahub/tests/test_lib/test_query_analysis/__init__.py b/datahub/tests/test_lib/test_query_analysis/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/datahub/tests/test_lib/query_analysis/test_lineage.py b/datahub/tests/test_lib/test_query_analysis/test_lineage.py similarity index 100% rename from datahub/tests/test_lib/query_analysis/test_lineage.py rename to datahub/tests/test_lib/test_query_analysis/test_lineage.py diff --git a/datahub/tests/test_lib/query_analysis/test_samples.py b/datahub/tests/test_lib/test_query_analysis/test_samples.py similarity index 100% rename from datahub/tests/test_lib/query_analysis/test_samples.py rename to datahub/tests/test_lib/test_query_analysis/test_samples.py diff --git a/datahub/tests/test_lib/query_analysis/test_templating.py b/datahub/tests/test_lib/test_query_analysis/test_templating.py similarity index 100% rename from datahub/tests/test_lib/query_analysis/test_templating.py rename to datahub/tests/test_lib/test_query_analysis/test_templating.py diff --git a/datahub/webapp/components/AppAdmin/AdminMetastore.tsx b/datahub/webapp/components/AppAdmin/AdminMetastore.tsx index f0487f4c7..fe3a15694 100644 --- a/datahub/webapp/components/AppAdmin/AdminMetastore.tsx +++ b/datahub/webapp/components/AppAdmin/AdminMetastore.tsx @@ -12,7 +12,7 @@ import { ITaskSchedule } from 'const/schedule'; import { AdminDeletedList } from './AdminDeletedList'; import { AdminAuditLogButton } from 'components/AdminAuditLog/AdminAuditLogButton'; -import { IAdminTask } from './AdminTask'; + import { TaskEditor } from 'components/Task/TaskEditor'; import { Button } from 'ui/Button/Button'; @@ -81,7 +81,7 @@ export const AdminMetastore: React.FunctionComponent = ({ const { data: metastoreUpdateSchedule, forceFetch: loadMetastoreUpdateSchedule, - } = useDataFetch({ + } = useDataFetch({ url: `/schedule/name/update_metastore_${metastoreId}/`, }); diff --git a/datahub/webapp/components/AppAdmin/AdminTask.tsx b/datahub/webapp/components/AppAdmin/AdminTask.tsx index 8b5d9229e..46b1fae75 100644 --- a/datahub/webapp/components/AppAdmin/AdminTask.tsx +++ b/datahub/webapp/components/AppAdmin/AdminTask.tsx @@ -8,6 +8,7 @@ import history from 'lib/router-history'; import { sendNotification } from 'lib/dataHubUI'; import { useDataFetch } from 'hooks/useDataFetch'; +import { ITaskSchedule, TaskType } from 'const/schedule'; import { TaskEditor } from 'components/Task/TaskEditor'; import { Button } from 'ui/Button/Button'; @@ -21,24 +22,6 @@ import { AdminAuditLogButton } from 'components/AdminAuditLog/AdminAuditLogButto import './AdminTask.scss'; -interface IProps {} - -export interface IAdminTask { - id: number; - name: string; - task: string; - task_type: TaskType; - cron: string; - args: any[]; - kwargs: any[]; - options: any; - last_run_at: number; - total_run_count: number; - enabled: boolean; -} - -type TaskType = 'prod' | 'user'; - const tableColumns = [ 'id', 'name', @@ -71,14 +54,14 @@ const tableColumnAligns: Record = { enabled: 'center', }; -export const AdminTask: React.FunctionComponent = () => { +export const AdminTask: React.FunctionComponent<{}> = () => { const { id: detailTaskId } = useParams(); const [type, setType] = React.useState('prod'); const [searchString, setSearchString] = React.useState(''); const { data: taskList, forceFetch: loadTaskList } = useDataFetch< - IAdminTask[] + ITaskSchedule[] >({ url: '/schedule/', }); @@ -111,7 +94,7 @@ export const AdminTask: React.FunctionComponent = () => { }, []); const formatCell = React.useCallback( - (index: number, column: string, row: IAdminTask) => { + (index: number, column: string, row: ITaskSchedule) => { const key = column; const value = row[key]; const taskId = row.id; diff --git a/datahub/webapp/components/DataDoc/DataDoc.tsx b/datahub/webapp/components/DataDoc/DataDoc.tsx index cec4ac7b3..720931907 100644 --- a/datahub/webapp/components/DataDoc/DataDoc.tsx +++ b/datahub/webapp/components/DataDoc/DataDoc.tsx @@ -527,11 +527,6 @@ class DataDocComponent extends React.Component { } }; - @bind - public runAllQueryCells() { - ds.save(`/datadoc/${this.props.docId}/run/`); - } - @bind public renderLazyDataDocCell( cell: IDataCell, @@ -687,7 +682,6 @@ class DataDocComponent extends React.Component { isSaving={isSavingDataDoc} isEditable={isEditable} isConnected={connected} - onRunAllQueries={this.runAllQueryCells} /> ); diff --git a/datahub/webapp/components/DataDocChartCell/DataDocChartComposer.tsx b/datahub/webapp/components/DataDocChartCell/DataDocChartComposer.tsx index 00454b099..6ad211f0e 100644 --- a/datahub/webapp/components/DataDocChartCell/DataDocChartComposer.tsx +++ b/datahub/webapp/components/DataDocChartCell/DataDocChartComposer.tsx @@ -26,10 +26,10 @@ import { mapMetaToFormVals } from 'lib/chart/chart-meta-processing'; import { transformData } from 'lib/chart/chart-data-transformation'; import { useChartSource } from 'hooks/chart/useChartSource'; -import { DataDocChart } from './DataDocChart'; import { QueryExecutionPicker } from 'components/ExecutionPicker/QueryExecutionPicker'; import { StatementExecutionPicker } from 'components/ExecutionPicker/StatementExecutionPicker'; import { StatementResultTable } from 'components/DataDocStatementExecution/StatementResultTable'; +import { queryCellSelector } from 'redux/dataDoc/selector'; import { Button } from 'ui/Button/Button'; import { IconButton } from 'ui/Button/IconButton'; @@ -38,7 +38,6 @@ import { Checkbox } from 'ui/Form/Checkbox'; import { FormField, FormSectionHeader } from 'ui/Form/FormField'; import { Tabs } from 'ui/Tabs/Tabs'; -import './DataDocChartComposer.scss'; import { Level, LevelItem } from 'ui/Level/Level'; import { SimpleReactSelect } from 'ui/SimpleReactSelect/SimpleReactSelect'; import { getDefaultScaleType } from 'lib/chart/chart-utils'; @@ -46,6 +45,10 @@ import { NumberField } from 'ui/FormikField/NumberField'; import { ReactSelectField } from 'ui/FormikField/ReactSelectField'; import { FormWrapper } from 'ui/Form/FormWrapper'; import { SimpleField } from 'ui/FormikField/SimpleField'; +import { DisabledSection } from 'ui/DisabledSection/DisabledSection'; + +import { DataDocChart } from './DataDocChart'; +import './DataDocChartComposer.scss'; interface IProps { meta?: IDataChartCellMeta; @@ -129,23 +132,9 @@ const DataDocChartComposerComponent: React.FunctionComponent< ]); // getting redux state - const queryCellOptions = useSelector((state: IStoreState) => { - const cellList = state.dataDoc.dataDocById[dataDocId].cells; - return cellList - .filter( - (id) => state.dataDoc.dataDocCellById[id].cell_type === 'query' - ) - - .map((id, idx) => { - const cellMeta: IDataQueryCellMeta = - state.dataDoc.dataDocCellById[id].meta; - const title = cellMeta.title || `Query #${idx + 1}`; - return { - id, - title, - }; - }); - }); + const queryCellOptions = useSelector((state: IStoreState) => + queryCellSelector(state, { docId: dataDocId }) + ); React.useEffect(() => { if ( @@ -738,11 +727,11 @@ const DataDocChartComposerComponent: React.FunctionComponent< const formDOM = ( -
+ {formTab === 'data' && dataTabDOM} {formTab === 'chart' && chartTabDOM} {formTab === 'visuals' && visualsTabDOM} -
+
); diff --git a/datahub/webapp/components/DataDocRightSidebar/DataDocRightSidebar.tsx b/datahub/webapp/components/DataDocRightSidebar/DataDocRightSidebar.tsx index 1452dcafc..92bf034c2 100644 --- a/datahub/webapp/components/DataDocRightSidebar/DataDocRightSidebar.tsx +++ b/datahub/webapp/components/DataDocRightSidebar/DataDocRightSidebar.tsx @@ -20,13 +20,11 @@ interface IProps { changeDataDocMeta: (docId: number, meta: Record) => any; onClone: () => any; - onRunAllQueries: () => void; } export const DataDocRightSidebar: React.FunctionComponent = ({ onClone, changeDataDocMeta, - onRunAllQueries, isSaving, isEditable, @@ -108,12 +106,6 @@ export const DataDocRightSidebar: React.FunctionComponent = ({ />
- {/* */} {templateButtonDOM} {scheduleButtonDOM} = ( onDelete, isEditable, }) => { - const { isLoading, isError, data, forceFetch } = useDataFetch({ + const { isLoading, isError, data, forceFetch } = useDataFetch< + IDataDocTaskSchedule + >({ url: `/datadoc/${docId}/schedule/`, }); const [currentTab, setCurrentTab] = React.useState('schedule'); @@ -69,15 +70,21 @@ export const DataDocSchedule: React.FunctionComponent = ( const getScheduleDOM = () => { let formDOM = null; - if (isEditable) { + if (data || isEditable) { // When editable, make create/update form formDOM = ( + isEditable={isEditable} + docId={docId} + cron={data?.cron ?? null} + enabled={data?.enabled ?? false} + kwargs={data?.kwargs ?? {}} + onCreate={(cron, kwargs) => ds - .save(`/datadoc/${docId}/schedule/`, { cron }) + .save(`/datadoc/${docId}/schedule/`, { + cron, + kwargs, + }) .then(() => { sendNotification('Schedule Created!'); forceFetch(); @@ -86,11 +93,12 @@ export const DataDocSchedule: React.FunctionComponent = ( } }) } - onUpdate={(cron, enabled) => + onUpdate={(cron, enabled, kwargs) => ds .update(`/datadoc/${docId}/schedule/`, { cron, enabled, + kwargs, }) .then(() => { sendNotification('Schedule Updated!'); @@ -113,18 +121,20 @@ export const DataDocSchedule: React.FunctionComponent = ( }) : null } + onRun={ + data + ? () => + ds + .save(`/datadoc/${docId}/schedule/run/`) + .then(() => { + sendNotification( + 'DataDoc execution started!' + ); + }) + : null + } /> ); - } else if (data) { - // Readonly view - const recurrence = cronToRecurrence(data.cron); - const enabled = data.enabled; - formDOM = ( -
-

Workflow {enabled ? 'Enabled' : 'Disabled'}

-

{getHumanReadableRecurrence(recurrence)}

-
- ); } else { // Readonly and no schedule formDOM = ( diff --git a/datahub/webapp/components/DataDocSchedule/DataDocScheduleForm.tsx b/datahub/webapp/components/DataDocSchedule/DataDocScheduleForm.tsx index 0e343b87e..1b530009c 100644 --- a/datahub/webapp/components/DataDocSchedule/DataDocScheduleForm.tsx +++ b/datahub/webapp/components/DataDocSchedule/DataDocScheduleForm.tsx @@ -1,24 +1,45 @@ import React from 'react'; import { Formik, Form } from 'formik'; import * as Yup from 'yup'; +import { useSelector } from 'react-redux'; +import { IDataDocScheduleKwargs, NotifyOn } from 'const/schedule'; import { cronToRecurrence, recurrenceToCron } from 'lib/utils/cron'; +import { getExporterAuthentication } from 'lib/result-export'; + +import { IStoreState } from 'redux/store/types'; +import { queryCellSelector } from 'redux/dataDoc/selector'; -import { Button } from 'ui/Button/Button'; import { RecurrenceEditor } from 'ui/ReccurenceEditor/RecurrenceEditor'; -import { FormField } from 'ui/Form/FormField'; +import { FormSectionHeader } from 'ui/Form/FormField'; import { FormWrapper } from 'ui/Form/FormWrapper'; import { InfoButton } from 'ui/Button/InfoButton'; +import { Level } from 'ui/Level/Level'; import { Title } from 'ui/Title/Title'; -import { ToggleSwitchField } from 'ui/FormikField/ToggleSwitchField'; +import { SimpleField } from 'ui/FormikField/SimpleField'; +import { DisabledSection } from 'ui/DisabledSection/DisabledSection'; +import { + SmartForm, + updateValue, + getDefaultFormValue, +} from 'ui/SmartForm/SmartForm'; +import { AsyncButton } from 'ui/AsyncButton/AsyncButton'; interface IDataDocScheduleFormProps { + isEditable: boolean; + docId: number; cron?: string; enabled?: boolean; + kwargs: IDataDocScheduleKwargs; - onCreate: (cron: string) => void; - onUpdate: (cron: string, enabled: boolean) => void; - onDelete?: () => void; + onCreate: (cron: string, kwargs: IDataDocScheduleKwargs) => Promise; + onUpdate: ( + cron: string, + enabled: boolean, + kwargs: IDataDocScheduleKwargs + ) => Promise; + onDelete?: () => Promise; + onRun?: () => Promise; } const scheduleFormSchema = Yup.object().shape({ @@ -37,25 +58,66 @@ const scheduleFormSchema = Yup.object().shape({ }), }), enabled: Yup.boolean().notRequired(), + kwargs: Yup.object().shape({ + notify_with: Yup.string().nullable(), + notify_on: Yup.mixed().when('notify_with', { + is: (val) => val != null, + then: Yup.mixed().required(), + }), + exporter_cell_id: Yup.number().nullable(), + exporter_name: Yup.string() + .nullable() + .when('exporter_cell_id', { + is: (val) => val != null, + then: Yup.string().required(), + }), + exporter_params: Yup.object(), + }), }); export const DataDocScheduleForm: React.FunctionComponent = ({ + isEditable, + + docId, cron, enabled, + kwargs, onCreate, onUpdate, onDelete, + onRun, }) => { + const queryCellOptions = useSelector((state: IStoreState) => + queryCellSelector(state, { docId }) + ); + const exporters = useSelector( + (state: IStoreState) => state.queryExecutions.statementExporters + ); + const isCreateForm = !Boolean(cron); const recurrence = cronToRecurrence(cron || '0 0 * * *'); const formValues = isCreateForm ? { recurrence, + kwargs: { + notify_with: null, + notify_on: NotifyOn.ALL, + exporter_cell_id: null, + exporter_name: null, + exporter_params: {}, + }, } : { recurrence, enabled, + kwargs: { + notify_with: kwargs.notify_with, + notify_on: kwargs.notify_on, + exporter_cell_id: kwargs.exporter_cell_id, + exporter_name: kwargs.exporter_name, + exporter_params: kwargs.exporter_params, + }, }; return ( @@ -63,25 +125,161 @@ export const DataDocScheduleForm: React.FunctionComponent { + onSubmit={async (values) => { const cronRepr = recurrenceToCron(values.recurrence); + const exporter = + values.kwargs.exporter_cell_id != null + ? exporters.find( + (exp) => exp.name === values.kwargs.exporter_name + ) + : null; + if (exporter) { + await getExporterAuthentication(exporter); + } + if (isCreateForm) { - onCreate(cronRepr); + await onCreate(cronRepr, values.kwargs); } else { - onUpdate(cronRepr, values.enabled); + await onUpdate(cronRepr, enabled, values.kwargs); } }} > - {({ handleSubmit, values, errors, setFieldValue, isValid }) => { + {({ + submitForm, + values, + errors, + setFieldValue, + isValid, + dirty, + }) => { const formTitle = isCreateForm ? 'Add new schedule' : 'Edit schedule'; - const enabledField = isCreateForm ? null : ( - - - + const enabledField = !isCreateForm && ( + + ); + + const notificationField = ( + <> + Notification + + {values.kwargs.notify_with && ( + + !isNaN(Number(NotifyOn[key])) + ) + .map(([key, value]) => ({ + value, + label: key, + }))} + /> + )} + + ); + + const exporter = exporters.find( + (exp) => exp.name === values.kwargs.exporter_name ); + const exportField = ( + <> + Export Results + ({ + value: val.id, + label: val.title, + }))} + withDeselect + /> + {values.kwargs.exporter_cell_id != null && ( + ({ + value: exp.name, + label: exp.name, + }))} + onChange={(v) => { + setFieldValue('kwargs.exporter_name', v); + setFieldValue( + 'kwargs.exporter_params', + exporter?.form + ? getDefaultFormValue(exporter.form) + : {} + ); + }} + /> + )} + {values.kwargs.exporter_cell_id != null && + exporter?.form && ( + <> + + Export Parameters + + + setFieldValue( + 'kwargs.exporter_params', + updateValue( + values.kwargs + .exporter_params, + path, + value, + [undefined, ''] + ) + ) + } + /> + + )} + + ); + + const controlDOM = isEditable && ( + +
+ {onRun && ( + + )} +
+
+ {onDelete && ( + + )} + +
+
+ ); + return (
@@ -98,31 +296,21 @@ export const DataDocScheduleForm: React.FunctionComponent
- - setFieldValue('recurrence', val) - } - /> - {enabledField} -
-
- {onDelete && ( -
+ {enabledField} + {notificationField} + {exportField} + +
+ {controlDOM} +
diff --git a/datahub/webapp/components/DataDocSchedule/DataDocScheduleRunLogs.tsx b/datahub/webapp/components/DataDocSchedule/DataDocScheduleRunLogs.tsx index 8feca5c6b..d972eb4ab 100644 --- a/datahub/webapp/components/DataDocSchedule/DataDocScheduleRunLogs.tsx +++ b/datahub/webapp/components/DataDocSchedule/DataDocScheduleRunLogs.tsx @@ -39,7 +39,7 @@ export const DataDocScheduleRunLogs: React.FunctionComponent<{ docId: number; }> = ({ docId }) => { const { isLoading, isError, data } = useDataFetch({ - url: `/datadoc/${docId}/run/`, + url: `/datadoc/${docId}/schedule/logs/`, }); if (isLoading) { diff --git a/datahub/webapp/components/DataDocStatementExecutionBar/ResultExportDropdown.tsx b/datahub/webapp/components/DataDocStatementExecutionBar/ResultExportDropdown.tsx index a27dd456c..5300e5363 100644 --- a/datahub/webapp/components/DataDocStatementExecutionBar/ResultExportDropdown.tsx +++ b/datahub/webapp/components/DataDocStatementExecutionBar/ResultExportDropdown.tsx @@ -4,7 +4,7 @@ import { useSelector, useDispatch } from 'react-redux'; import { IStatementExecution, IStatementResult, - IStatementExporter, + IQueryResultExporter, } from 'redux/queryExecutions/types'; import * as queryExecutionsActions from 'redux/queryExecutions/action'; import { IStoreState, Dispatch } from 'redux/store/types'; @@ -12,6 +12,7 @@ import { IStoreState, Dispatch } from 'redux/store/types'; import ds from 'lib/datasource'; import * as Utils from 'lib/utils'; import { getStatementExecutionResultDownloadUrl } from 'lib/query-execution'; +import { getExporterAuthentication } from 'lib/result-export'; import { DropdownMenu } from 'ui/DropdownMenu/DropdownMenu'; import { Button } from 'ui/Button/Button'; @@ -103,11 +104,13 @@ export const ResultExportDropdown: React.FunctionComponent = ({ }, [statementId, statementResult, loadStatementResult]); const onGenericExportClick = React.useCallback( - async (exporter: IStatementExporter) => { + async (exporter: IQueryResultExporter) => { try { + await getExporterAuthentication(exporter); + sendNotification(`Exporting, please wait`); const { data } = await ds.fetch( - `/statement_execution/${statementId}/export/`, + `/query_execution_exporter/statement_execution/${statementId}/`, { export_name: exporter.name, } diff --git a/datahub/webapp/components/Task/TaskEditor.tsx b/datahub/webapp/components/Task/TaskEditor.tsx index 77a1be681..881bb91a9 100644 --- a/datahub/webapp/components/Task/TaskEditor.tsx +++ b/datahub/webapp/components/Task/TaskEditor.tsx @@ -13,7 +13,7 @@ import { import { sendNotification, sendConfirm } from 'lib/dataHubUI'; import { useDataFetch } from 'hooks/useDataFetch'; -import { IAdminTask } from 'components/AppAdmin/AdminTask'; +import { ITaskSchedule } from 'const/schedule'; import { TaskStatus } from 'components/Task/TaskStatus'; import { AsyncButton } from 'ui/AsyncButton/AsyncButton'; @@ -34,7 +34,7 @@ import { AdminAuditLogButton } from 'components/AdminAuditLog/AdminAuditLogButto type TaskEditorTabs = 'edit' | 'history'; interface IProps { - task: Partial; + task: Partial; onTaskUpdate?: () => void; onTaskDelete?: () => void; onTaskCreate?: (id?: number) => void; diff --git a/datahub/webapp/const/schedule.ts b/datahub/webapp/const/schedule.ts index 0df38630a..77a10c11f 100644 --- a/datahub/webapp/const/schedule.ts +++ b/datahub/webapp/const/schedule.ts @@ -13,7 +13,11 @@ export interface ITaskSchedule { task: string; task_type: TaskType; cron: string; + args: any[]; + kwargs: Record; + options: Record; last_run_at: number; + total_run_count: number; enabled: boolean; } @@ -26,3 +30,21 @@ export interface ITaskStatusRecord { updated_at: number; task_type: TaskType; } + +export enum NotifyOn { + ALL = 0, + ON_FAILURE = 1, + ON_SUCCESS = 2, +} + +export interface IDataDocScheduleKwargs { + notify_with?: null | 'email' | 'slack'; + notify_on?: NotifyOn; + exporter_cell_id?: number; + exporter_name?: string; + exporter_params?: Record; +} + +export interface IDataDocTaskSchedule extends ITaskSchedule { + kwargs: IDataDocScheduleKwargs; +} diff --git a/datahub/webapp/lib/result-export.ts b/datahub/webapp/lib/result-export.ts new file mode 100644 index 000000000..e1ba124d5 --- /dev/null +++ b/datahub/webapp/lib/result-export.ts @@ -0,0 +1,41 @@ +import ds from 'lib/datasource'; +import { IQueryResultExporter } from 'redux/queryExecutions/types'; + +export function getExporterAuthentication( + exporter: IQueryResultExporter +): Promise { + return new Promise(async (resolve, reject) => { + if (!exporter.requires_auth) { + resolve(); + return; + } + + const { data: url } = await ds.fetch( + '/query_execution_exporter/auth/', + { + export_name: exporter.name, + } + ); + if (!url) { + resolve(); + return; + } + + const authWindow = window.open(url); + const receiveMessage = () => { + authWindow.close(); + delete (window as any).receiveChildMessage; + window.removeEventListener('message', receiveMessage, false); + resolve(); + }; + (window as any).receiveChildMessage = receiveMessage; + + // If window is closed without having received message + const timer = setInterval(() => { + if (authWindow.closed) { + clearInterval(timer); + reject(Error('Authentication process failed')); + } + }, 1000); + }); +} diff --git a/datahub/webapp/redux/dataDoc/selector.ts b/datahub/webapp/redux/dataDoc/selector.ts index 95ffd3677..182a8cd0c 100644 --- a/datahub/webapp/redux/dataDoc/selector.ts +++ b/datahub/webapp/redux/dataDoc/selector.ts @@ -7,7 +7,7 @@ import { permissionToReadWrite, } from 'lib/data-doc/datadoc-permission'; import { IStoreState } from 'redux/store/types'; -import { IDataCell } from 'const/datadoc'; +import { IDataCell, IDataQueryCellMeta } from 'const/datadoc'; import { myUserInfoSelector } from 'redux/user/selector'; @@ -189,3 +189,17 @@ export const canCurrentUserEditSelector = createSelector( return permissionToReadWrite(permission).write; } ); + +export const queryCellSelector = createSelector(dataDocCellsSelector, (cells) => + cells + .filter((cell) => cell.cell_type === 'query') + + .map((cell, index) => { + const cellMeta: IDataQueryCellMeta = cell.meta; + const title = cellMeta.title ?? `Query #${index + 1}`; + return { + id: cell.id, + title, + }; + }) +); diff --git a/datahub/webapp/redux/queryExecutions/action.ts b/datahub/webapp/redux/queryExecutions/action.ts index e1b7b2ddf..8dbe7deee 100644 --- a/datahub/webapp/redux/queryExecutions/action.ts +++ b/datahub/webapp/redux/queryExecutions/action.ts @@ -251,11 +251,11 @@ export function createQueryExecution( export function fetchExporters(): ThunkResult> { return async (dispatch) => { const { data: exporters } = await ds.fetch( - '/statement_execution_exporter/' + '/query_execution_exporter/' ); dispatch({ - type: '@@queryExecutions/RECEIVE_STATEMENT_EXECUTION_EXPORTERS', + type: '@@queryExecutions/RECEIVE_QUERY_RESULT_EXPORTERS', payload: { exporters, }, diff --git a/datahub/webapp/redux/queryExecutions/reducer.ts b/datahub/webapp/redux/queryExecutions/reducer.ts index 01844ad17..134d76d38 100644 --- a/datahub/webapp/redux/queryExecutions/reducer.ts +++ b/datahub/webapp/redux/queryExecutions/reducer.ts @@ -307,7 +307,7 @@ function statementExporters( action: QueryExecutionAction ) { switch (action.type) { - case '@@queryExecutions/RECEIVE_STATEMENT_EXECUTION_EXPORTERS': { + case '@@queryExecutions/RECEIVE_QUERY_RESULT_EXPORTERS': { return action.payload.exporters; } } diff --git a/datahub/webapp/redux/queryExecutions/types.ts b/datahub/webapp/redux/queryExecutions/types.ts index 37a71f860..98a0116b2 100644 --- a/datahub/webapp/redux/queryExecutions/types.ts +++ b/datahub/webapp/redux/queryExecutions/types.ts @@ -64,9 +64,11 @@ export interface IQueryError { error_message: string; } -export interface IStatementExporter { +export interface IQueryResultExporter { name: string; type: 'url' | 'text'; + requires_auth: boolean; + form: IStructFormField; } export interface IReceiveQueryExecutionsAction extends Action { @@ -142,9 +144,9 @@ export interface IReceiveStatementExecutionUpdateAction extends Action { } export interface IReceiveStatementExporters extends Action { - type: '@@queryExecutions/RECEIVE_STATEMENT_EXECUTION_EXPORTERS'; + type: '@@queryExecutions/RECEIVE_QUERY_RESULT_EXPORTERS'; payload: { - exporters: IStatementExporter[]; + exporters: IQueryResultExporter[]; }; } @@ -177,7 +179,7 @@ export interface IQueryExecutionState { statementResultById: Record; statementLogById: Record; - statementExporters: IStatementExporter[]; + statementExporters: IQueryResultExporter[]; } export type ThunkResult = ThunkAction< diff --git a/datahub/webapp/ui/DisabledSection/DisabledSection.tsx b/datahub/webapp/ui/DisabledSection/DisabledSection.tsx new file mode 100644 index 000000000..5284bb417 --- /dev/null +++ b/datahub/webapp/ui/DisabledSection/DisabledSection.tsx @@ -0,0 +1,14 @@ +import styled from 'styled-components'; + +interface IDisabledSectionProps { + disabled: boolean; +} + +export const DisabledSection = styled.div` + ${({ disabled = true }: IDisabledSectionProps) => + disabled && + ` + cursor: no-drop; + pointer-events: none; +`}; +`; diff --git a/datahub/webapp/ui/JsonViewer/JsonViewer.tsx b/datahub/webapp/ui/JsonViewer/JsonViewer.tsx index 6f75fbb42..bc7591902 100644 --- a/datahub/webapp/ui/JsonViewer/JsonViewer.tsx +++ b/datahub/webapp/ui/JsonViewer/JsonViewer.tsx @@ -12,7 +12,7 @@ const StyledJsonViewer = styled.div` `; export interface IJsonViewerProps { - value?: string | {}; + value: string | {}; indent?: number; replacerFunction?: () => any; } diff --git a/datahub/webapp/ui/SimpleReactSelect/SimpleReactSelect.tsx b/datahub/webapp/ui/SimpleReactSelect/SimpleReactSelect.tsx index f24428f5b..fa6f98a65 100644 --- a/datahub/webapp/ui/SimpleReactSelect/SimpleReactSelect.tsx +++ b/datahub/webapp/ui/SimpleReactSelect/SimpleReactSelect.tsx @@ -2,7 +2,7 @@ import React, { useMemo, useCallback } from 'react'; import { Props as ReactSelectProps } from 'react-select/lib/Select'; import Select from 'react-select'; -import { defaultReactSelectStyles } from 'lib/utils/react-select'; +import { makeReactSelectStyle } from 'lib/utils/react-select'; interface ISelectOption { value: T; @@ -18,6 +18,8 @@ export interface ISimpleReactSelectProps { selectProps?: Partial>; } +const reactSelectStyle = makeReactSelectStyle(true); + export function SimpleReactSelect({ options, value, @@ -44,7 +46,8 @@ export function SimpleReactSelect({ return (