Skip to content

Commit

Permalink
Refactored concatenating
Browse files Browse the repository at this point in the history
  • Loading branch information
dobraczka committed Jul 16, 2024
1 parent ea3c0bf commit b107a4c
Show file tree
Hide file tree
Showing 25 changed files with 675 additions and 1,739 deletions.
6 changes: 0 additions & 6 deletions src/klinker/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,18 +2,12 @@

from .data import (
KlinkerBlockManager,
KlinkerDaskFrame,
KlinkerDataset,
KlinkerFrame,
NNBasedKlinkerBlockManager,
CompositeWithNNBasedKlinkerBlockManager,
)

__all__ = [
"KlinkerFrame",
"KlinkerPandasFrame",
"KlinkerDaskFrame",
"KlinkerPandasTripleFrame",
"KlinkerDataset",
"KlinkerBlockManager",
"NNBasedKlinkerBlockManager",
Expand Down
124 changes: 64 additions & 60 deletions src/klinker/blockers/attribute_clustering.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,8 @@
import pandas as pd
from nltk.tokenize import word_tokenize

from ..data import KlinkerFrame
from ..typing import FrameType
from ..data.blocks import KlinkerBlockManager
from klinker.data import KlinkerPandasFrame, KlinkerTriplePandasFrame
from ..encoders import TokenizedFrameEncoder, frame_encoder_resolver
from class_resolver import HintOrType, OptionalKwargs

Expand Down Expand Up @@ -68,14 +67,9 @@ def _conc_cluster_labels(
val_col="value",
label_col="cluster_label",
):
res = KlinkerTriplePandasFrame.from_df(
frame.merge(labels, left_on=frame_col, right_on=val_col)[
[id_col, frame_col, label_col]
],
id_col=frame.id_col,
table_name=frame.table_name,
)
return res
return frame.merge(labels, left_on=frame_col, right_on=val_col)[
[id_col, frame_col, label_col]
]

def _get_all_embeddings(self, left, right, value_col_name):
left_vals = left[value_col_name].unique()
Expand Down Expand Up @@ -116,13 +110,15 @@ def _save_labels_if_wanted(self, labels: np.ndarray):

def embed_and_cluster(
self,
left: KlinkerFrame,
right: KlinkerFrame,
left: FrameType,
right: FrameType,
left_table_name: str,
right_table_name: str,
value_col_name: str = "tail",
) -> Tuple[KlinkerFrame, KlinkerFrame]:
) -> Tuple[FrameType, FrameType]:
left_emb, right_emb = self._get_all_embeddings(left, right, value_col_name)
self._save_embeddings_if_wanted(
left_emb, right_emb, left.table_name, right.table_name
left_emb, right_emb, left_table_name, right_table_name
)
all_vec = left_emb._tensor_lib.concatenate(
[left_emb.vectors, right_emb.vectors]
Expand Down Expand Up @@ -190,12 +186,18 @@ def __init__(

def assign(
self,
left: KlinkerFrame,
right: KlinkerFrame,
left_rel: Optional[KlinkerFrame] = None,
right_rel: Optional[KlinkerFrame] = None,
left: FrameType,
right: FrameType,
left_rel: Optional[FrameType] = None,
right_rel: Optional[FrameType] = None,
left_id_col: str = "head",
right_id_col: str = "head",
left_table_name: str = "left",
right_table_name: str = "right",
) -> KlinkerBlockManager:
left_c, right_c = self.embed_and_cluster(left, right)
left_c, right_c = self.embed_and_cluster(
left, right, left_table_name, right_table_name
)
return super().assign(left_c, right_c)


Expand Down Expand Up @@ -244,12 +246,18 @@ def __init__(

def assign(
self,
left: KlinkerFrame,
right: KlinkerFrame,
left_rel: Optional[KlinkerFrame] = None,
right_rel: Optional[KlinkerFrame] = None,
left: FrameType,
right: FrameType,
left_rel: Optional[FrameType] = None,
right_rel: Optional[FrameType] = None,
left_id_col: str = "head",
right_id_col: str = "head",
left_table_name: str = "left",
right_table_name: str = "right",
) -> KlinkerBlockManager:
left_c, right_c = self.embed_and_cluster(left, right)
left_c, right_c = self.embed_and_cluster(
left, right, left_table_name, right_table_name
)
return super().assign(left_c, right_c)


Expand Down Expand Up @@ -297,26 +305,24 @@ def __init__(

def assign(
self,
left: KlinkerFrame,
right: KlinkerFrame,
left_rel: Optional[KlinkerFrame] = None,
right_rel: Optional[KlinkerFrame] = None,
left: FrameType,
right: FrameType,
left_rel: Optional[FrameType] = None,
right_rel: Optional[FrameType] = None,
left_id_col: str = "head",
right_id_col: str = "head",
left_table_name: str = "left",
right_table_name: str = "left",
) -> KlinkerBlockManager:
left_tok = KlinkerPandasFrame.from_df(
self._inner_token_blocker._create_exploded_token_frame(
left.fillna("").concat_values()
).rename(columns={left.table_name: "tail"}),
id_col=left.id_col,
table_name=left.table_name,
left_tok = self._inner_token_blocker._create_exploded_token_frame(
left.fillna("").concat_values()
).rename(columns={left_table_name: "tail"})
right_tok = self._inner_token_blocker._create_exploded_token_frame(
right.fillna("").concat_values()
).rename(columns={right_table_name: "tail"})
left_c, right_c = self.embed_and_cluster(
left_tok, right_tok, left_table_name, right_table_name
)
right_tok = KlinkerPandasFrame.from_df(
self._inner_token_blocker._create_exploded_token_frame(
right.fillna("").concat_values()
).rename(columns={right.table_name: "tail"}),
id_col=right.id_col,
table_name=right.table_name,
)
left_c, right_c = self.embed_and_cluster(left_tok, right_tok)
return super().assign(left_c, right_c)


Expand Down Expand Up @@ -370,26 +376,24 @@ def __init__(

def assign(
self,
left: KlinkerFrame,
right: KlinkerFrame,
left_rel: Optional[KlinkerFrame] = None,
right_rel: Optional[KlinkerFrame] = None,
left: FrameType,
right: FrameType,
left_rel: Optional[FrameType] = None,
right_rel: Optional[FrameType] = None,
left_id_col: str = "head",
right_id_col: str = "head",
left_table_name: str = "left",
right_table_name: str = "right",
) -> KlinkerBlockManager:
left_tok = KlinkerPandasFrame.from_df(
self._inner_token_blocker._create_exploded_token_frame(
left.fillna("").concat_values()
).rename(columns={left.table_name: "tail"}),
id_col=left.id_col,
table_name=left.table_name,
)
right_tok = KlinkerPandasFrame.from_df(
self._inner_token_blocker._create_exploded_token_frame(
right.fillna("").concat_values()
).rename(columns={right.table_name: "tail"}),
id_col=right.id_col,
table_name=right.table_name,
left_tok = self._inner_token_blocker._create_exploded_token_frame(
left.fillna("").concat_values()
).rename(columns={left_table_name: "tail"})
right_tok = self._inner_token_blocker._create_exploded_token_frame(
right.fillna("").concat_values()
).rename(columns={right_table_name: "tail"})
left_c, right_c = self.embed_and_cluster(
left_tok, right_tok, left_table_name, right_table_name
)
left_c, right_c = self.embed_and_cluster(left_tok, right_tok)
return super().assign(left_c, right_c)


Expand Down
78 changes: 50 additions & 28 deletions src/klinker/blockers/base.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
import abc
import pandas as pd
import dask.dataframe as dd
from typing import Optional

from klinker.typing import SeriesType

from ..data import KlinkerBlockManager, KlinkerFrame
from ..data import KlinkerBlockManager
from .concat_utils import concat_values
from ..typing import FrameType, SeriesType


class Blocker(abc.ABC):
Expand All @@ -12,45 +14,53 @@ class Blocker(abc.ABC):
@abc.abstractmethod
def assign(
self,
left: KlinkerFrame,
right: KlinkerFrame,
left_rel: Optional[KlinkerFrame] = None,
right_rel: Optional[KlinkerFrame] = None,
left: FrameType,
right: FrameType,
left_rel: Optional[FrameType] = None,
right_rel: Optional[FrameType] = None,
left_id_col: str = "head",
right_id_col: str = "head",
left_table_name: str = "left",
right_table_name: str = "right",
) -> KlinkerBlockManager:
"""Assign entity ids to blocks.
Args:
----
left: KlinkerFrame: Contains entity attribute information of left dataset.
right: KlinkerFrame: Contains entity attribute information of right dataset.
left_rel: Optional[KlinkerFrame]: (Default value = None) Contains relational information of left dataset.
right_rel: Optional[KlinkerFrame]: (Default value = None) Contains relational information of left dataset.
left: Contains entity attribute information of left dataset.
right: Contains entity attribute information of right dataset.
left_rel: Contains relational information of left dataset.
right_rel: Contains relational information of left dataset.
Returns:
-------
KlinkerBlockManager: instance holding the resulting blocks.
"""


class SchemaAgnosticBlocker(Blocker):
"""Base class for schema-agnostic Blockers."""
class AttributeConcatBlocker(Blocker):
"""Base class for Blockers that need to concatenate attribute info."""

@abc.abstractmethod
def _assign(
self,
left: SeriesType,
right: SeriesType,
left_rel: Optional[KlinkerFrame] = None,
right_rel: Optional[KlinkerFrame] = None,
left_rel: Optional[FrameType] = None,
right_rel: Optional[FrameType] = None,
left_id_col: str = "head",
right_id_col: str = "head",
left_table_name: str = "left",
right_table_name: str = "right",
) -> KlinkerBlockManager:
"""Assign entity ids to blocks.
Args:
----
left: SeriesType: concatenated entity attribute values of left dataset as series.
right: SeriesType: concatenated entity attribute values of left dataset as series.
left_rel: Optional[KlinkerFrame]: (Default value = None) Contains relational information of left dataset.
right_rel: Optional[KlinkerFrame]: (Default value = None) Contains relational information of left dataset.
left: concatenated entity attribute values of left dataset as series.
right: concatenated entity attribute values of left dataset as series.
left_rel: Contains relational information of left dataset.
right_rel: Contains relational information of left dataset.
Returns:
-------
Expand All @@ -59,30 +69,42 @@ def _assign(

def assign(
self,
left: KlinkerFrame,
right: KlinkerFrame,
left_rel: Optional[KlinkerFrame] = None,
right_rel: Optional[KlinkerFrame] = None,
left: FrameType,
right: FrameType,
left_rel: Optional[FrameType] = None,
right_rel: Optional[FrameType] = None,
left_id_col: str = "head",
right_id_col: str = "head",
left_table_name: str = "left",
right_table_name: str = "right",
) -> KlinkerBlockManager:
"""Assign entity ids to blocks.
Will concat all entity attribute information before proceeding.
Args:
----
left: KlinkerFrame: Contains entity attribute information of left dataset.
right: KlinkerFrame: Contains entity attribute information of right dataset.
left_rel: Optional[KlinkerFrame]: (Default value = None) Contains relational information of left dataset.
right_rel: Optional[KlinkerFrame]: (Default value = None) Contains relational information of left dataset.
left: Contains entity attribute information of left dataset.
right: Contains entity attribute information of right dataset.
left_rel: Contains relational information of left dataset.
right_rel: Contains relational information of left dataset.
Returns:
-------
KlinkerBlockManager: instance holding the resulting blocks.
"""
left_reduced, right_reduced = left.concat_values(), right.concat_values()
if not isinstance(left, (pd.Series, dd.Series)):
left_reduced, right_reduced = (
concat_values(left, id_col=left_id_col),
concat_values(right, id_col=right_id_col),
)
return self._assign(
left=left_reduced,
right=right_reduced,
left_rel=left_rel,
right_rel=right_rel,
left_id_col=left_id_col,
right_id_col=right_id_col,
left_table_name=left_table_name,
right_table_name=right_table_name,
)
Loading

0 comments on commit b107a4c

Please sign in to comment.