Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix #12167: Support for Stored Procedures as another entity under Database Schema #12999

Merged
merged 5 commits into from
Aug 25, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 14 additions & 0 deletions bootstrap/sql/migrations/native/1.2.0/mysql/schemaChanges.sql
Original file line number Diff line number Diff line change
Expand Up @@ -57,3 +57,17 @@ CREATE TABLE IF NOT EXISTS search_index_entity (
UPDATE ingestion_pipeline_entity
SET json = JSON_REPLACE(json, '$.airflowConfig.retries', 0)
WHERE JSON_EXTRACT(json, '$.airflowConfig.retries') IS NOT NULL;


-- create stored procedure entity
CREATE TABLE IF NOT EXISTS stored_procedure_entity (
id VARCHAR(36) GENERATED ALWAYS AS (json ->> '$.id') STORED NOT NULL,
name VARCHAR(256) GENERATED ALWAYS AS (json ->> '$.name') NOT NULL,
fqnHash VARCHAR(256) NOT NULL COLLATE ascii_bin,
json JSON NOT NULL,
updatedAt BIGINT UNSIGNED GENERATED ALWAYS AS (json ->> '$.updatedAt') NOT NULL,
updatedBy VARCHAR(256) GENERATED ALWAYS AS (json ->> '$.updatedBy') NOT NULL,
deleted BOOLEAN GENERATED ALWAYS AS (json -> '$.deleted'),
PRIMARY KEY (id),
UNIQUE (fqnHash)
);
13 changes: 13 additions & 0 deletions bootstrap/sql/migrations/native/1.2.0/postgres/schemaChanges.sql
Original file line number Diff line number Diff line change
Expand Up @@ -56,3 +56,16 @@ CREATE TABLE IF NOT EXISTS search_index_entity (
-- We were hardcoding retries to 0. Since we are now using the IngestionPipeline to set them, keep existing ones to 0.
UPDATE ingestion_pipeline_entity
SET json = jsonb_set(json::jsonb, '{airflowConfig,retries}', '0', true);

-- create stored procedure entity
CREATE TABLE IF NOT EXISTS stored_procedure_entity (
id VARCHAR(36) GENERATED ALWAYS AS (json ->> 'id') STORED NOT NULL,
name VARCHAR(256) GENERATED ALWAYS AS (json ->> 'name') STORED NOT NULL,
fqnHash VARCHAR(256) NOT NULL,
json JSONB NOT NULL,
updatedAt BIGINT GENERATED ALWAYS AS ((json ->> 'updatedAt')::bigint) STORED NOT NULL,
updatedBy VARCHAR(256) GENERATED ALWAYS AS (json ->> 'updatedBy') STORED NOT NULL,
deleted BOOLEAN GENERATED ALWAYS AS ((json ->> 'deleted')::boolean) STORED,
PRIMARY KEY (id),
UNIQUE (fqnHash)
);
60 changes: 60 additions & 0 deletions ingestion/examples/sample_data/datasets/stored_procedures.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
{
"storedProcedures": [
{
"name": "update_dim_address_table",
"description": "This stored procedure updates dim_address table",
"version": 0.1,
"updatedAt": 1638354087391,
"updatedBy": "anonymous",
"href": "http://localhost:8585/api/v1/tables/3cda8ecb-f4c6-4ed4-8506-abe965b54b86",
"storedProcedureCode": {
"langauge": "SQL",
"code": "CREATE OR REPLACE PROCEDURE output_message(message VARCHAR)\nRETURNS VARCHAR NOT NULL\nLANGUAGE SQL\nAS\n$$\nBEGIN\n RETURN message;\nEND;\n$$\n;"
},
"database": {
"id": "50da1ff8-4e1d-4967-8931-45edbf4dd908",
"type": "database",
"name": "sample_data.ecommerce_db",
"description": "This **mock** database contains tables related to shopify sales and orders with related dimension tables.",
"href": "http://localhost:8585/api/v1/databases/50da1ff8-4e1d-4967-8931-45edbf4dd908"
},
"tags": [],
"followers": [],
"databaseSchema": {
"id": "d7be1e2c-b3dc-11ec-b909-0242ac120002",
"type": "databaseSchema",
"name": "sample_data.ecommerce_db.shopify",
"description": "This **mock** Schema contains tables related to shopify sales and orders with related dimension tables.",
"href": "http://localhost:8585/api/v1/databaseSchemas/d7be1e2c-b3dc-11ec-b909-0242ac120002"
}
},
{
"name": "update_orders_table",
"description": "This stored procedure is written java script to update the orders table",
"version": 0.1,
"updatedAt": 1638354087391,
"updatedBy": "anonymous",
"href": "http://localhost:8585/api/v1/tables/3cda8ecb-f4c6-4ed4-8506-abe965b54b86",
"storedProcedureCode": {
"langauge": "JavaScript",
"code": "create or replace procedure read_result_set()\n returns float not null\n language javascript\n as \n $$ \n var my_sql_command = \"select * from table1\";\n var statement1 = snowflake.createStatement( {sqlText: my_sql_command} );\n var result_set1 = statement1.execute();\n // Loop through the results, processing one row at a time... \n while (result_set1.next()) {\n var column1 = result_set1.getColumnValue(1);\n var column2 = result_set1.getColumnValue(2);\n // Do something with the retrieved values...\n }\n return 0.0; // Replace with something more useful.\n $$\n ;"
},
"database": {
"id": "50da1ff8-4e1d-4967-8931-45edbf4dd908",
"type": "database",
"name": "sample_data.ecommerce_db",
"description": "This **mock** database contains tables related to shopify sales and orders with related dimension tables.",
"href": "http://localhost:8585/api/v1/databases/50da1ff8-4e1d-4967-8931-45edbf4dd908"
},
"tags": [],
"followers": [],
"databaseSchema": {
"id": "d7be1e2c-b3dc-11ec-b909-0242ac120002",
"type": "databaseSchema",
"name": "sample_data.ecommerce_db.shopify",
"description": "This **mock** Schema contains tables related to shopify sales and orders with related dimension tables.",
"href": "http://localhost:8585/api/v1/databaseSchemas/d7be1e2c-b3dc-11ec-b909-0242ac120002"
}
}
]
}
24 changes: 2 additions & 22 deletions ingestion/src/metadata/ingestion/ometa/ometa_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,20 +67,6 @@
T = TypeVar("T", bound=BaseModel)
C = TypeVar("C", bound=BaseModel)

# Helps us dynamically load the Entity class path in the
# generated module.
MODULE_PATH = {
"policy": "policies",
"service": "services",
"tag": "classification",
"classification": "classification",
"test": "tests",
"user": "teams",
"role": "teams",
"team": "teams",
"workflow": "automations",
}


class MissingEntityTypeException(Exception):
"""
Expand Down Expand Up @@ -195,12 +181,7 @@ def get_module_path(self, entity: Type[T]) -> str:
Based on the entity, return the module path
it is found inside generated
"""

for key, value in MODULE_PATH.items():
if key in entity.__name__.lower():
return value

return self.data_path
return entity.__module__.split(".")[-2]

def get_create_entity_type(self, entity: Type[T]) -> Type[C]:
"""
Expand Down Expand Up @@ -247,8 +228,8 @@ def get_entity_from_create(self, create: Type[C]) -> Type[T]:
.replace("testdefinition", "testDefinition")
.replace("testcase", "testCase")
.replace("searchindex", "searchIndex")
.replace("storedprocedure", "storedProcedure")
)

class_path = ".".join(
filter(
None,
Expand All @@ -260,7 +241,6 @@ def get_entity_from_create(self, create: Type[C]) -> Type[T]:
],
)
)

entity_class = getattr(
__import__(class_path, globals(), locals(), [class_name]), class_name
)
Expand Down
6 changes: 6 additions & 0 deletions ingestion/src/metadata/ingestion/ometa/routes.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,9 @@
from metadata.generated.schema.api.data.createSearchIndex import (
CreateSearchIndexRequest,
)
from metadata.generated.schema.api.data.createStoredProcedure import (
CreateStoredProcedureRequest,
)
from metadata.generated.schema.api.data.createTable import CreateTableRequest
from metadata.generated.schema.api.data.createTopic import CreateTopicRequest
from metadata.generated.schema.api.lineage.addLineage import AddLineageRequest
Expand Down Expand Up @@ -98,6 +101,7 @@
from metadata.generated.schema.entity.data.query import Query
from metadata.generated.schema.entity.data.report import Report
from metadata.generated.schema.entity.data.searchIndex import SearchIndex
from metadata.generated.schema.entity.data.storedProcedure import StoredProcedure
from metadata.generated.schema.entity.data.table import Table
from metadata.generated.schema.entity.data.topic import Topic
from metadata.generated.schema.entity.policies.policy import Policy
Expand Down Expand Up @@ -150,6 +154,8 @@
CreateContainerRequest.__name__: "/containers",
SearchIndex.__name__: "/searchIndexes",
CreateSearchIndexRequest.__name__: "/searchIndexes",
StoredProcedure.__name__: "/storedProcedures",
CreateStoredProcedureRequest.__name__: "/storedProcedures",
# Classifications
Tag.__name__: "/tags",
CreateTagRequest.__name__: "/tags",
Expand Down
71 changes: 71 additions & 0 deletions ingestion/src/metadata/ingestion/source/database/sample_data.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,9 @@
from metadata.generated.schema.api.data.createSearchIndex import (
CreateSearchIndexRequest,
)
from metadata.generated.schema.api.data.createStoredProcedure import (
CreateStoredProcedureRequest,
)
from metadata.generated.schema.api.data.createTable import CreateTableRequest
from metadata.generated.schema.api.data.createTableProfile import (
CreateTableProfileRequest,
Expand All @@ -60,6 +63,7 @@
MlStore,
)
from metadata.generated.schema.entity.data.pipeline import Pipeline, PipelineStatus
from metadata.generated.schema.entity.data.storedProcedure import StoredProcedureCode
from metadata.generated.schema.entity.data.table import (
ColumnProfile,
SystemProfile,
Expand Down Expand Up @@ -244,6 +248,13 @@ def __init__(self, config: WorkflowSource, metadata_config: OpenMetadataConnecti
encoding=UTF_8,
)
)
self.stored_procedures = json.load(
open( # pylint: disable=consider-using-with
sample_data_folder + "/datasets/stored_procedures.json",
"r",
encoding=UTF_8,
)
)
self.database_service_json = json.load(
open( # pylint: disable=consider-using-with
sample_data_folder + "/datasets/service.json",
Expand Down Expand Up @@ -509,6 +520,7 @@ def next_record(self) -> Iterable[Entity]:
yield from self.ingest_users()
yield from self.ingest_glue()
yield from self.ingest_tables()
yield from self.ingest_stored_procedures()
yield from self.ingest_topics()
yield from self.ingest_charts()
yield from self.ingest_data_models()
Expand Down Expand Up @@ -687,6 +699,65 @@ def ingest_tables(self):
),
)

def ingest_stored_procedures(self):
"""
Ingest Sample Stored Procedures
"""

db = CreateDatabaseRequest(
name=self.database["name"],
description=self.database["description"],
service=self.database_service.fullyQualifiedName.__root__,
)
yield db

database_entity = fqn.build(
self.metadata,
entity_type=Database,
service_name=self.database_service.name.__root__,
database_name=db.name.__root__,
)

database_object = self.metadata.get_by_name(
entity=Database, fqn=database_entity
)

schema = CreateDatabaseSchemaRequest(
name=self.database_schema["name"],
description=self.database_schema["description"],
database=database_object.fullyQualifiedName,
)
yield schema

database_schema_entity = fqn.build(
self.metadata,
entity_type=DatabaseSchema,
service_name=self.database_service.name.__root__,
database_name=db.name.__root__,
schema_name=schema.name.__root__,
)

database_schema_object = self.metadata.get_by_name(
entity=DatabaseSchema, fqn=database_schema_entity
)

resp = self.metadata.list_entities(entity=User, limit=5)
self.user_entity = resp.entities

for stored_procedure in self.stored_procedures["storedProcedures"]:
stored_procedure = CreateStoredProcedureRequest(
name=stored_procedure["name"],
description=stored_procedure["description"],
storedProcedureCode=StoredProcedureCode(
**stored_procedure["storedProcedureCode"]
),
databaseSchema=database_schema_object.fullyQualifiedName,
tags=stored_procedure["tags"],
)

self.status.scanned(f"StoredProcedure Scanned: {stored_procedure.name}")
yield stored_procedure

def ingest_topics(self) -> Iterable[CreateTopicRequest]:
"""
Ingest Sample Topics
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,7 @@ public final class Entity {
// Data asset entities
//
public static final String TABLE = "table";
public static final String STORED_PROCEDURE = "storedProcedure";
public static final String DATABASE = "database";
public static final String DATABASE_SCHEMA = "databaseSchema";
public static final String METRICS = "metrics";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@
import org.openmetadata.schema.entity.data.Query;
import org.openmetadata.schema.entity.data.Report;
import org.openmetadata.schema.entity.data.SearchIndex;
import org.openmetadata.schema.entity.data.StoredProcedure;
import org.openmetadata.schema.entity.data.Table;
import org.openmetadata.schema.entity.data.Topic;
import org.openmetadata.schema.entity.domains.DataProduct;
Expand Down Expand Up @@ -167,6 +168,9 @@ public interface CollectionDAO {
@CreateSqlObject
TableDAO tableDAO();

@CreateSqlObject
QueryDAO queryDAO();

@CreateSqlObject
UsageDAO usageDAO();

Expand Down Expand Up @@ -249,7 +253,7 @@ public interface CollectionDAO {
FeedDAO feedDAO();

@CreateSqlObject
QueryDAO queryDAO();
StoredProcedureDAO storedProcedureDAO();

@CreateSqlObject
ChangeEventDAO changeEventDAO();
Expand Down Expand Up @@ -1809,6 +1813,23 @@ default List<String> listAfter(ListFilter filter, int limit, String after) {
}
}

interface StoredProcedureDAO extends EntityDAO<StoredProcedure> {
@Override
default String getTableName() {
return "stored_procedure_entity";
}

@Override
default Class<StoredProcedure> getEntityClass() {
return StoredProcedure.class;
}

@Override
default String getNameHashColumn() {
return "fqnHash";
}
}

interface QueryDAO extends EntityDAO<Query> {
@Override
default String getTableName() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,12 @@
import org.openmetadata.service.util.JsonUtils;

public class SearchIndexRepository extends EntityRepository<SearchIndex> {

public SearchIndexRepository(CollectionDAO dao) {
super(
SearchIndexResource.COLLECTION_PATH, Entity.SEARCH_INDEX, SearchIndex.class, dao.searchIndexDAO(), dao, "", "");
}

@Override
public void setFullyQualifiedName(SearchIndex searchIndex) {
searchIndex.setFullyQualifiedName(
Expand All @@ -63,11 +69,6 @@ public void setFullyQualifiedName(SearchIndex searchIndex) {
}
}

public SearchIndexRepository(CollectionDAO dao) {
super(
SearchIndexResource.COLLECTION_PATH, Entity.SEARCH_INDEX, SearchIndex.class, dao.searchIndexDAO(), dao, "", "");
}

@Override
public void prepare(SearchIndex searchIndex) {
SearchService searchService = Entity.getEntity(searchIndex.getService(), "", ALL);
Expand Down
Loading