Skip to content

Commit

Permalink
Add lineage and pii libraries
Browse files Browse the repository at this point in the history
  • Loading branch information
bulv1ne committed Jun 22, 2023
1 parent d2f877f commit fb9926e
Show file tree
Hide file tree
Showing 12 changed files with 533 additions and 78 deletions.
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -159,3 +159,6 @@ cython_debug/
# End of https://www.toptal.com/developers/gitignore/api/python

.databricks-connect

spark-warehouse/
metastore_db/
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,9 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
- New SSM module to fetch data from AWS parameter store
- Function drop_all_parameters_null_columns
- Function location_for_hive_table
- Lineage class to get downstream_tables
- pii.Producer and pii.Consumer for creating pii removal requests

### Fixed
- DeltaTable.isDeltaTable doesn't seem to work with Unity Catalog,
the function `delta_utils.core.last_written_timestamp_for_delta_path` will now try to get the last timestamp
Expand Down
77 changes: 77 additions & 0 deletions delta_utils/lineage.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
from dataclasses import dataclass
from typing import Set

import requests


@dataclass
class Lineage:
"""
Represents a lineage object for tracking dependencies between tables in a Databricks workspace.
Args:
databricks_workspace_url (str): The URL of the Databricks workspace.
databricks_token (str): The access token for authentication with the Databricks API.
Methods:
downstream_tables(self, source_table: str) -> Set[str]:
Returns a set of table names that are dependent upon the specified source table.
Example:
```python
# Instantiate a Lineage object
lineage = Lineage(
databricks_workspace_url="https://example.databricks.com",
databricks_token="abc123",
)
# Get downstream tables for a source table
downstream_tables = lineage.downstream_tables("source_table")
```
"""

databricks_workspace_url: str
databricks_token: str

def downstream_tables(self, source_table: str) -> Set[str]:
"""
Retrieves a set of table names that are dependent upon the specified source table.
This method queries the Databricks workspace using the provided URL and access token to identify tables that
have a dependency on the specified source table.
Args:
source_table (str): The name of the source table.
Returns:
Set[str]: A set of table names that are dependent upon the source table.
Raises:
requests.exceptions.HTTPError:
If the source_table is not found or if there is an error retrieving the downstream tables.
Example:
```python
# Get downstream tables for a source table
downstream_tables = lineage.downstream_tables("source_table")
```
"""
resp = requests.get(
f"{self.databricks_workspace_url}/api/2.0/lineage-tracking/table-lineage",
json={
"table_name": source_table,
"inculude_entity_lineage": True,
},
headers={
"Accept": "application/json",
"Authorization": f"Bearer {self.databricks_token}",
},
)
resp.raise_for_status()

lineage_info = resp.json()
return {
"{catalog_name}.{schema_name}.{name}".format_map(row["tableInfo"])
for row in lineage_info.get("downstreams", [])
if "tableInfo" in row
}
213 changes: 213 additions & 0 deletions delta_utils/pii.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,213 @@
"""
Warning:
Ask your Databricks administrators to set the environmental variable `PII_TABLE` before you get started.
Example: `PII_TABLE=db_admin.gdpr.one_time_deletes`
"""

import os
import uuid
from dataclasses import dataclass
from datetime import datetime
from functools import reduce
from operator import and_
from typing import List, Optional, Tuple

from delta import DeltaTable
from pyspark.sql import DataFrame, SparkSession, functions as F, types as T

ONE_TIME_DELETES_SCHEMA = (
T.StructType()
.add("id", T.StringType(), False)
.add("created_at", T.TimestampType(), False)
.add("affected_table", T.StringType(), False)
.add("source_table", T.StringType(), False)
.add("source_columns", T.ArrayType(T.StringType()), True)
.add(
"source_identifying_attributes",
T.ArrayType(
T.StructType().add("column", T.StringType()).add("value", T.StringType())
),
False,
)
.add("when_to_delete", T.TimestampType(), True)
.add("deleted", T.BooleanType(), True)
)


def get_pii_table_name() -> str:
return os.getenv("PII_TABLE", "gdpr.one_time_deletes")


def create_pii_table(spark):
return (
DeltaTable.createIfNotExists(spark)
.tableName(get_pii_table_name())
.addColumns(ONE_TIME_DELETES_SCHEMA)
.execute()
)


def get_pii_table(spark) -> DeltaTable:
return DeltaTable.forName(spark, get_pii_table_name())


@dataclass
class Producer:
"""
Args:
spark (SparkSession): the active spark session
Example:
# Create a removal request
```python
producer = Producer(spark)
producer.create_removal_request(
affected_table="beta_live.world.adults_only",
source_table="alpha_live.world.people",
source_identifying_attributes=[("id", "1")],
)
```
"""

spark: SparkSession

def __post_init__(self):
self.dt = get_pii_table(self.spark)

def create_removal_request(
self,
*,
affected_table: str,
source_table: str,
source_columns: Optional[List[str]] = None,
source_identifying_attributes: List[Tuple[str, str]],
when_to_delete: Optional[datetime] = None,
):
"""
Creates a personal identifiable information (PII) removal request.
This function generates a request to remove personal identifiable information (PII) from a specified affected table
by utilizing the source table and associated columns with identifying attributes. The request can optionally include
a specific date and time for when the deletion should occur.
Args:
affected_table (str): The name of the affected table from which PII needs to be removed.
source_table (str): The name of the source table that contains the associated columns for identifying attributes.
source_columns (Optional[List[str]], optional): A list of column names in the source table that hold PII.
Defaults to None.
source_identifying_attributes (List[Tuple[str, str]]): A list of tuples representing the identifying attributes
to match the PII records in the affected table. Each tuple consists of a column name in the source table
and the value of the PII records.
when_to_delete (Optional[datetime], optional): An optional datetime object representing the specific date and
time when the PII deletion should occur. Defaults to None.
Raises:
ValueError: If the affected_table or source_table is not provided or if source_identifying_attributes is empty.
"""

if source_columns:
invalid_source_columns = ", ".join(
col for col in source_columns if "." in col
)
if invalid_source_columns:
raise ValueError(
f"Can't use the columns: {invalid_source_columns}. Only root columns can be used."
)
df_update = self.spark.createDataFrame(
[
(
str(uuid.uuid4()),
datetime.now(),
affected_table,
source_table,
source_columns,
source_identifying_attributes,
when_to_delete or datetime.utcnow(),
False,
)
],
schema=ONE_TIME_DELETES_SCHEMA,
)

# Create merge upsert condition
merge_attr = [
"affected_table",
"source_identifying_attributes",
"source_table",
"source_columns",
]
merge_statement = reduce(
and_,
[
F.col(f"source.{attr}") == F.col(f"updates.{attr}")
for attr in merge_attr
],
)

# Do upsert
(
self.dt.alias("source")
.merge(df_update.alias("updates"), merge_statement)
.whenMatchedUpdate(
set={
"created_at": "updates.created_at",
"when_to_delete": "updates.when_to_delete",
"deleted": "updates.deleted",
}
)
.whenNotMatchedInsertAll()
.execute()
)


@dataclass
class Consumer:
"""
Args:
spark (SparkSession): the active spark session
consumer (str): the consuming catalog name
Example:
# Get all the removal requests and mark one as completed
```python
consumer = Consumer(spark, "beta_live")
consumer.get_removal_requests().display()
# After the handling the deletiong of a request
consumer.mark_as_completed("abc123")
```
"""

spark: SparkSession
consumer: str

def __post_init__(self):
self.dt = get_pii_table(self.spark)

def get_removal_requests(self) -> DataFrame:
"""
Get all removal requests
Returns:
Dataframe: a dataframe that can be displayed with all the delete requests
"""
return (
self.dt.toDF()
.where(F.col("affected_table").startswith(f"{self.consumer}."))
.where(~F.col("deleted"))
.drop("deleted")
)

def mark_as_completed(self, id: str):
"""
Mark the removal request as completed
Args:
id (str): the UUID from the request
"""
self.dt.update(
F.col("id") == id,
set={"deleted": "true"},
)
3 changes: 3 additions & 0 deletions docs/lineage.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
# Lineage

::: delta_utils.lineage
28 changes: 28 additions & 0 deletions docs/pii.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
# Personal Identifiable Information

::: delta_utils.pii

## Using it together with lineage

```python
# Instantiate a Lineage object
lineage = Lineage(
databricks_workspace_url=dbutils.secrets.get("DATABRICKS", "URL"),
databricks_token=dbutils.secrets.get("DATABRICKS", "TOKEN"),
)

# Instantiate the Producer object
producer = Producer(spark)

source_table = "alpha_live.world.people"

# Get downstream tables for the alpha_live.world.people table
downstream_tables = lineage.downstream_tables(source_table)

for affected_table in downstream_tables:
producer.create_removal_request(
affected_table=affected_table,
source_table=source_table,
source_identifying_attributes=[("id", "1")],
)
```
2 changes: 2 additions & 0 deletions mkdocs.yml
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,11 @@ nav:
- 'clean.md'
- 'AWS':
- 'ssm.md'
- 'pii.md'
- 'Other':
- 'geocoding.md'
- 'core.md'
- 'lineage.md'
- 'About':
- Issue Tracker: 'https://github.com/husqvarnagroup/delta_utils/issues'
- 'CHANGELOG.md'
Expand Down
Loading

0 comments on commit fb9926e

Please sign in to comment.