Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

working on lineage es integration #14181

Merged
merged 65 commits into from
Jan 9, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
65 commits
Select commit Hold shift + click to select a range
f880e28
working on lineage es integration
07Himank Nov 30, 2023
2ca0d14
os change
07Himank Nov 30, 2023
7e463b7
Merge branch 'main' into lineageEsIntegration
karanh37 Dec 8, 2023
6c70458
Merge branch 'main' into lineageEsIntegration
07Himank Dec 8, 2023
aa2cf35
working
07Himank Dec 8, 2023
c0d3269
working on lineage es integration
07Himank Dec 8, 2023
9db0635
Merge branch 'main' into lineageEsIntegration
07Himank Dec 8, 2023
e7c79ab
add curremt lineage, special character handling, delete lineage
07Himank Dec 12, 2023
f716d18
Merge branch 'lineageEsIntegration' of https://github.com/open-metada…
07Himank Dec 12, 2023
4a1942d
remove test code
07Himank Dec 12, 2023
8cc9868
update lineage index work done
07Himank Dec 13, 2023
83347d7
fix: add lineage ui changes
karanh37 Dec 14, 2023
1dd5fa3
fix: lineage add edit
karanh37 Dec 14, 2023
dd6ee2a
fix: center on load
karanh37 Dec 14, 2023
79f5357
fix: add remove edges
karanh37 Dec 18, 2023
9499b6d
Merge branch 'main' into lineageEsIntegration
karanh37 Dec 18, 2023
563c07e
fix: column lineage issues
karanh37 Dec 18, 2023
6467ddb
use reactflow controls
karanh37 Dec 19, 2023
06451f2
add collapse
karanh37 Dec 19, 2023
5f8c9ba
added upstream and downstream depth diffrently
07Himank Dec 19, 2023
d0d703b
resolved conflicts
07Himank Dec 19, 2023
f91521c
depth and collapse
karanh37 Dec 19, 2023
f47f7cf
fix loading of dialog
karanh37 Dec 19, 2023
6e51586
add includeDeleted in API
karanh37 Dec 19, 2023
0036951
cleanup
karanh37 Dec 20, 2023
9bff7d0
Merge branch 'lineageEsIntegration' of https://github.com/open-metada…
07Himank Dec 20, 2023
4c9a75d
cleanup
karanh37 Dec 20, 2023
68b3a7f
cleanup
karanh37 Dec 20, 2023
1f82245
initial unit tests
karanh37 Dec 21, 2023
618cf74
Merge branch 'lineageEsIntegration' of https://github.com/open-metada…
07Himank Dec 22, 2023
bc4535d
main merge
07Himank Dec 22, 2023
ac62f3f
fix tests
karanh37 Dec 22, 2023
be25483
cleanup
karanh37 Dec 22, 2023
35f646b
Merge branch 'main' into lineageEsIntegration
karanh37 Jan 1, 2024
f248598
update filter query
karanh37 Jan 1, 2024
87d57a6
Merge branch 'main' of https://github.com/open-metadata/OpenMetadata …
07Himank Jan 2, 2024
d98bee1
fix: allow sql query to update
karanh37 Jan 2, 2024
ce56981
use input instead of select
karanh37 Jan 2, 2024
cc34ca6
unit tests
karanh37 Jan 2, 2024
62173fa
Merge branch 'main' into lineageEsIntegration
karanh37 Jan 2, 2024
ccbdbb3
unit tests
karanh37 Jan 2, 2024
098aee4
fix edge removal
karanh37 Jan 3, 2024
ade1383
cleanup
karanh37 Jan 3, 2024
3c0ff57
description and deleted
07Himank Jan 4, 2024
ed8296a
Merge branch 'lineageEsIntegration' of https://github.com/open-metada…
07Himank Jan 4, 2024
9365069
fix: initial lineage tests
karanh37 Jan 4, 2024
ae26c1e
Merge branch 'main' into lineageEsIntegration
karanh37 Jan 4, 2024
c7289d3
fix unit tests
karanh37 Jan 4, 2024
b7c5ef0
Merge branch 'main' into lineageEsIntegration
karanh37 Jan 5, 2024
083b0f3
fix: review comments
karanh37 Jan 7, 2024
e7cf223
Merge branch 'main' into lineageEsIntegration
karanh37 Jan 7, 2024
1ba671f
added edgedescription
07Himank Jan 8, 2024
97e9f2a
Merge branch 'lineageEsIntegration' of https://github.com/open-metada…
07Himank Jan 8, 2024
5761b7b
refactor custom nodes
karanh37 Jan 8, 2024
fea3185
refactor condition
karanh37 Jan 8, 2024
0554029
Merge branch 'main' into lineageEsIntegration
karanh37 Jan 8, 2024
26757ab
add pipeline status color
karanh37 Jan 8, 2024
9437778
fix unit tests
karanh37 Jan 8, 2024
f1e3f6f
Merge branch 'main' into lineageEsIntegration
karanh37 Jan 8, 2024
6a3aaad
cleanup
karanh37 Jan 8, 2024
795d23e
cleanup
karanh37 Jan 9, 2024
1075013
added opensearch code and changed endpoint
07Himank Jan 9, 2024
015b7ac
Merge branch 'lineageEsIntegration' of https://github.com/open-metada…
07Himank Jan 9, 2024
b7ccc3c
fix: lineage url
karanh37 Jan 9, 2024
0da8c79
depth issue resolved
07Himank Jan 9, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,19 @@

package org.openmetadata.service.jdbi3;

import static org.openmetadata.service.search.SearchClient.GLOBAL_SEARCH_ALIAS;
import static org.openmetadata.service.search.SearchClient.REMOVE_LINEAGE_SCRIPT;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.stream.Collectors;
import org.apache.commons.lang3.tuple.ImmutablePair;
import org.apache.commons.lang3.tuple.Pair;
import org.jdbi.v3.sqlobject.transaction.Transaction;
import org.openmetadata.common.utils.CommonUtil;
import org.openmetadata.schema.ColumnsEntityInterface;
import org.openmetadata.schema.api.lineage.AddLineage;
import org.openmetadata.schema.entity.data.Table;
Expand All @@ -30,13 +38,17 @@
import org.openmetadata.schema.type.Relationship;
import org.openmetadata.service.Entity;
import org.openmetadata.service.jdbi3.CollectionDAO.EntityRelationshipRecord;
import org.openmetadata.service.search.SearchClient;
import org.openmetadata.service.search.models.IndexMapping;
import org.openmetadata.service.util.FullyQualifiedName;
import org.openmetadata.service.util.JsonUtils;

@Repository
public class LineageRepository {
private final CollectionDAO dao;

public SearchClient searchClient = Entity.getSearchRepository().getSearchClient();

public LineageRepository() {
this.dao = Entity.getCollectionDAO();
Entity.setLineageRepository(this);
Expand Down Expand Up @@ -88,6 +100,72 @@ public void addLineage(AddLineage addLineage) {
to.getType(),
Relationship.UPSTREAM.ordinal(),
detailsJson);
addLineageToSearch(from, to, addLineage.getEdge().getLineageDetails());
}

private void addLineageToSearch(
EntityReference fromEntity, EntityReference toEntity, LineageDetails lineageDetails) {
IndexMapping sourceIndexMapping =
Entity.getSearchRepository().getIndexMapping(fromEntity.getType());
String sourceIndexName =
sourceIndexMapping.getIndexName(Entity.getSearchRepository().getClusterAlias());
IndexMapping destinationIndexMapping =
Entity.getSearchRepository().getIndexMapping(toEntity.getType());
String destinationIndexName =
destinationIndexMapping.getIndexName(Entity.getSearchRepository().getClusterAlias());
Map<String, Object> relationshipDetails = new HashMap<>();
Pair<String, String> from = new ImmutablePair<>("_id", fromEntity.getId().toString());
Pair<String, String> to = new ImmutablePair<>("_id", toEntity.getId().toString());
processLineageData(fromEntity, toEntity, lineageDetails, relationshipDetails);
searchClient.updateLineage(sourceIndexName, from, relationshipDetails);
searchClient.updateLineage(destinationIndexName, to, relationshipDetails);
}

private void processLineageData(
EntityReference fromEntity,
EntityReference toEntity,
LineageDetails lineageDetails,
Map<String, Object> relationshipDetails) {
Map<String, Object> fromDetails = new HashMap<>();
Map<String, Object> toDetails = new HashMap<>();
fromDetails.put("id", fromEntity.getId().toString());
fromDetails.put("type", fromEntity.getType());
fromDetails.put("fqn", fromEntity.getFullyQualifiedName());
toDetails.put("id", toEntity.getId().toString());
toDetails.put("type", toEntity.getType());
toDetails.put("fqn", toEntity.getFullyQualifiedName());
relationshipDetails.put(
"doc_id", fromEntity.getId().toString() + "-" + toEntity.getId().toString());
relationshipDetails.put("fromEntity", fromDetails);
relationshipDetails.put("toEntity", toDetails);
if (lineageDetails != null) {
relationshipDetails.put(
"pipeline",
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this could be pipeline or stored procedure as an edge cc @pmbrull

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@07Himank need to address this, it can be pipeline or stored procedure here

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@harshach just key name is pipeline but it works for both pipeline and stored procedure .. do you want to me change the key name?

JsonUtils.getMap(
CommonUtil.nullOrEmpty(lineageDetails.getPipeline())
? null
: lineageDetails.getPipeline()));
relationshipDetails.put(
"description",
CommonUtil.nullOrEmpty(lineageDetails.getDescription())
? null
: lineageDetails.getDescription());
if (!CommonUtil.nullOrEmpty(lineageDetails.getColumnsLineage())) {
List<Map<String, Object>> colummnLineageList = new ArrayList<>();
for (ColumnLineage columnLineage : lineageDetails.getColumnsLineage()) {
colummnLineageList.add(JsonUtils.getMap(columnLineage));
}
relationshipDetails.put("columns", colummnLineageList);
}
relationshipDetails.put(
"sqlQuery",
CommonUtil.nullOrEmpty(lineageDetails.getSqlQuery())
? null
: lineageDetails.getSqlQuery());
relationshipDetails.put(
"source",
CommonUtil.nullOrEmpty(lineageDetails.getSource()) ? null : lineageDetails.getSource());
}
}

private String validateLineageDetails(
Expand Down Expand Up @@ -143,14 +221,30 @@ public boolean deleteLineage(String fromEntity, String fromId, String toEntity,
Entity.getEntityReferenceById(toEntity, UUID.fromString(toId), Include.NON_DELETED);

// Finally, delete lineage relationship
return dao.relationshipDAO()
.delete(
from.getId(),
from.getType(),
to.getId(),
to.getType(),
Relationship.UPSTREAM.ordinal())
> 0;
boolean result =
dao.relationshipDAO()
.delete(
from.getId(),
from.getType(),
to.getId(),
to.getType(),
Relationship.UPSTREAM.ordinal())
> 0;
deleteLineageFromSearch(from, to);
return result;
}

private void deleteLineageFromSearch(EntityReference fromEntity, EntityReference toEntity) {
searchClient.updateChildren(
GLOBAL_SEARCH_ALIAS,
new ImmutablePair<>(
"lineage.doc_id.keyword",
fromEntity.getId().toString() + "-" + toEntity.getId().toString()),
new ImmutablePair<>(
String.format(
REMOVE_LINEAGE_SCRIPT,
fromEntity.getId().toString() + "-" + toEntity.getId().toString()),
null));
}

private EntityLineage getLineage(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,15 @@

import static javax.ws.rs.core.Response.Status.NOT_FOUND;

import es.org.elasticsearch.action.search.SearchResponse;
import io.dropwizard.jersey.errors.ErrorMessage;
import io.swagger.v3.oas.annotations.Operation;
import io.swagger.v3.oas.annotations.Parameter;
import io.swagger.v3.oas.annotations.media.Content;
import io.swagger.v3.oas.annotations.media.Schema;
import io.swagger.v3.oas.annotations.responses.ApiResponse;
import io.swagger.v3.oas.annotations.tags.Tag;
import java.io.IOException;
import java.util.List;
import javax.validation.Valid;
import javax.validation.constraints.Max;
Expand Down Expand Up @@ -165,6 +167,41 @@ public EntityLineage getByName(
return addHref(uriInfo, dao.getByName(entity, fqn, upstreamDepth, downStreamDepth));
}

@GET
@Path("/getLineage")
@Operation(
operationId = "searchLineage",
summary = "Search lineage",
responses = {
@ApiResponse(
responseCode = "200",
description = "search response",
content =
@Content(
mediaType = "application/json",
schema = @Schema(implementation = SearchResponse.class)))
})
public Response searchLineage(
@Context UriInfo uriInfo,
@Context SecurityContext securityContext,
@Parameter(description = "fqn") @QueryParam("fqn") String fqn,
@Parameter(description = "upstreamDepth") @QueryParam("upstreamDepth") int upstreamDepth,
@Parameter(description = "downstreamDepth") @QueryParam("downstreamDepth")
int downstreamDepth,
@Parameter(
description =
"Elasticsearch query that will be combined with the query_string query generator from the `query` argument")
@QueryParam("query_filter")
String queryFilter,
@Parameter(description = "Filter documents by deleted param. By default deleted is false")
@QueryParam("includeDeleted")
boolean deleted)
throws IOException {

return Entity.getSearchRepository()
.searchLineage(fqn, upstreamDepth, downstreamDepth, queryFilter, deleted);
}

@PUT
@Operation(
operationId = "addLineageEdge",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,11 @@ public interface SearchClient {
String REMOVE_TAGS_CHILDREN_SCRIPT =
"for (int i = 0; i < ctx._source.tags.length; i++) { if (ctx._source.tags[i].tagFQN == '%s') { ctx._source.tags.remove(i) }}";

String REMOVE_LINEAGE_SCRIPT =
"for (int i = 0; i < ctx._source.lineage.length; i++) { if (ctx._source.lineage[i].doc_id == '%s') { ctx._source.lineage.remove(i) }}";

String ADD_UPDATE_LINEAGE =
"boolean docIdExists = false; for (int i = 0; i < ctx._source.lineage.size(); i++) { if (ctx._source.lineage[i].doc_id.equalsIgnoreCase(params.lineageData.doc_id)) { ctx._source.lineage[i] = params.lineageData; docIdExists = true; break;}}if (!docIdExists) {ctx._source.lineage.add(params.lineageData);}";
String UPDATE_ADDED_DELETE_GLOSSARY_TAGS =
"if (ctx._source.tags != null) { for (int i = ctx._source.tags.size() - 1; i >= 0; i--) { if (params.tagDeleted != null) { for (int j = 0; j < params.tagDeleted.size(); j++) { if (ctx._source.tags[i].tagFQN.equalsIgnoreCase(params.tagDeleted[j].tagFQN)) { ctx._source.tags.remove(i); } } } } } if (ctx._source.tags == null) { ctx._source.tags = []; } if (params.tagAdded != null) { ctx._source.tags.addAll(params.tagAdded); } ctx._source.tags = ctx._source.tags .stream() .distinct() .sorted((o1, o2) -> o1.tagFQN.compareTo(o2.tagFQN)) .collect(Collectors.toList());";
String REMOVE_TEST_SUITE_CHILDREN_SCRIPT =
Expand All @@ -67,6 +72,10 @@ public interface SearchClient {

Response searchBySourceUrl(String sourceUrl) throws IOException;

Response searchLineage(
String fqn, int upstreamDepth, int downstreamDepth, String queryFilter, boolean deleted)
throws IOException;

Response searchByField(String fieldName, String fieldValue, String index) throws IOException;

Response aggregate(String index, String fieldName, String value, String query) throws IOException;
Expand Down Expand Up @@ -95,6 +104,9 @@ void updateChildren(
Pair<String, String> fieldAndValue,
Pair<String, Map<String, Object>> updates);

void updateLineage(
String indexName, Pair<String, String> fieldAndValue, Map<String, Object> lineagaData);

TreeMap<Long, List<Object>> getSortedDate(
String team,
Long scheduleTime,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -637,6 +637,12 @@ public Response searchBySourceUrl(String sourceUrl) throws IOException {
return searchClient.searchBySourceUrl(sourceUrl);
}

public Response searchLineage(
String fqn, int upstreamDepth, int downstreamDepth, String queryFilter, boolean deleted)
throws IOException {
return searchClient.searchLineage(fqn, upstreamDepth, downstreamDepth, queryFilter, deleted);
}

public Response searchByField(String fieldName, String fieldValue, String index)
throws IOException {
return searchClient.searchByField(fieldName, fieldValue, index);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,7 @@
import org.apache.http.auth.UsernamePasswordCredentials;
import org.apache.http.client.CredentialsProvider;
import org.apache.http.impl.client.BasicCredentialsProvider;
import org.openmetadata.common.utils.CommonUtil;
import org.openmetadata.schema.DataInsightInterface;
import org.openmetadata.schema.dataInsight.DataInsightChartResult;
import org.openmetadata.schema.service.configuration.elasticsearch.ElasticSearchConfiguration;
Expand Down Expand Up @@ -419,6 +420,95 @@ public Response search(SearchRequest request) throws IOException {
return Response.status(OK).entity(response).build();
}

@Override
public Response searchLineage(
String fqn, int upstreamDepth, int downstreamDepth, String queryFilter, boolean deleted)
throws IOException {
Map<String, Object> responseMap = new HashMap<>();
List<Map<String, Object>> edges = new ArrayList<>();
Set<Map<String, Object>> nodes = new HashSet<>();
es.org.elasticsearch.action.search.SearchRequest searchRequest =
new es.org.elasticsearch.action.search.SearchRequest(GLOBAL_SEARCH_ALIAS);
SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
searchSourceBuilder.query(
QueryBuilders.boolQuery().must(QueryBuilders.termQuery("fullyQualifiedName", fqn)));
searchRequest.source(searchSourceBuilder);
SearchResponse searchResponse = client.search(searchRequest, RequestOptions.DEFAULT);
for (var hit : searchResponse.getHits().getHits()) {
responseMap.put("entity", hit.getSourceAsMap());
}
getLineage(
fqn, downstreamDepth, edges, nodes, queryFilter, "lineage.fromEntity.fqn.keyword", deleted);
getLineage(
fqn, upstreamDepth, edges, nodes, queryFilter, "lineage.toEntity.fqn.keyword", deleted);
responseMap.put("edges", edges);
responseMap.put("nodes", nodes);
return Response.status(OK).entity(responseMap).build();
}

private void getLineage(
String fqn,
int depth,
List<Map<String, Object>> edges,
Set<Map<String, Object>> nodes,
String queryFilter,
String direction,
boolean deleted)
throws IOException {
if (depth <= 0) {
return;
}
es.org.elasticsearch.action.search.SearchRequest searchRequest =
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we move this into import

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@harshach We have two SearchRequest one is internal that why we use it this way.

new es.org.elasticsearch.action.search.SearchRequest(GLOBAL_SEARCH_ALIAS);
SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
searchSourceBuilder.query(
QueryBuilders.boolQuery().must(QueryBuilders.termQuery(direction, fqn)));
if (CommonUtil.nullOrEmpty(deleted)) {
searchSourceBuilder.query(
QueryBuilders.boolQuery()
.must(QueryBuilders.termQuery(direction, fqn))
.must(QueryBuilders.termQuery("deleted", deleted)));
}
if (!nullOrEmpty(queryFilter) && !queryFilter.equals("{}")) {
try {
XContentParser filterParser =
XContentType.JSON
.xContent()
.createParser(xContentRegistry, LoggingDeprecationHandler.INSTANCE, queryFilter);
QueryBuilder filter = SearchSourceBuilder.fromXContent(filterParser).query();
BoolQueryBuilder newQuery =
QueryBuilders.boolQuery().must(searchSourceBuilder.query()).filter(filter);
searchSourceBuilder.query(newQuery);
} catch (Exception ex) {
LOG.warn("Error parsing query_filter from query parameters, ignoring filter", ex);
}
}
searchRequest.source(searchSourceBuilder);
SearchResponse searchResponse = client.search(searchRequest, RequestOptions.DEFAULT);
for (var hit : searchResponse.getHits().getHits()) {
List<Map<String, Object>> lineage =
(List<Map<String, Object>>) hit.getSourceAsMap().get("lineage");
nodes.add(hit.getSourceAsMap());
for (Map<String, Object> lin : lineage) {
HashMap<String, String> fromEntity = (HashMap<String, String>) lin.get("fromEntity");
HashMap<String, String> toEntity = (HashMap<String, String>) lin.get("toEntity");
if (direction.equalsIgnoreCase("lineage.fromEntity.fqn.keyword")) {
if (!edges.contains(lin) && fromEntity.get("fqn").equals(fqn)) {
edges.add(lin);
getLineage(
toEntity.get("fqn"), depth - 1, edges, nodes, queryFilter, direction, deleted);
}
} else {
if (!edges.contains(lin) && toEntity.get("fqn").equals(fqn)) {
edges.add(lin);
getLineage(
fromEntity.get("fqn"), depth - 1, edges, nodes, queryFilter, direction, deleted);
}
}
}
}
}

@Override
public Response searchBySourceUrl(String sourceUrl) throws IOException {
es.org.elasticsearch.action.search.SearchRequest searchRequest =
Expand Down Expand Up @@ -1136,6 +1226,26 @@ public void updateChildren(
}
}

/**
* @param indexName
* @param fieldAndValue
*/
@Override
public void updateLineage(
String indexName, Pair<String, String> fieldAndValue, Map<String, Object> lineageData) {
if (isClientAvailable) {
UpdateByQueryRequest updateByQueryRequest = new UpdateByQueryRequest(indexName);
updateByQueryRequest.setQuery(
new MatchQueryBuilder(fieldAndValue.getKey(), fieldAndValue.getValue())
.operator(Operator.AND));
Map<String, Object> params = Collections.singletonMap("lineageData", lineageData);
Script script =
new Script(ScriptType.INLINE, Script.DEFAULT_SCRIPT_LANG, ADD_UPDATE_LINEAGE, params);
updateByQueryRequest.setScript(script);
updateElasticSearchByQuery(updateByQueryRequest);
}
}

public void updateElasticSearch(UpdateRequest updateRequest) {
if (updateRequest != null && isClientAvailable) {
updateRequest.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ public Map<String, Object> buildESDoc() {
doc.put("column_suggest", columnSuggest);
doc.put("entityType", Entity.CONTAINER);
doc.put("serviceType", container.getServiceType());
doc.put("lineage", SearchIndex.getLineageData(container.getEntityReference()));
doc.put(
"fqnParts",
getFQNParts(
Expand Down
Loading
Loading