Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fixes #5448: Implement initial Iceberg Connector using PyIceberg #14825

Merged
merged 36 commits into from
Jan 29, 2024
Merged
Show file tree
Hide file tree
Changes from 8 commits
Commits
Show all changes
36 commits
Select commit Hold shift + click to select a range
43a717f
Create the iceberg connection schema
IceS2 Jan 18, 2024
647b72f
Link the IcebergConnection configuration with the forms on the UI
IceS2 Jan 18, 2024
26acda5
Add the pyiceberg dependency on the ingestion package
IceS2 Jan 18, 2024
60ac93d
Create the get_connection and test_connection functions
IceS2 Jan 18, 2024
1047f4a
First iteration on the iceberg ingestion logic
IceS2 Jan 19, 2024
c2d764b
Add A more comprehensive implementation of the Iceberg Source
IceS2 Jan 23, 2024
6d2777d
Add UnitTests
IceS2 Jan 23, 2024
f02d937
Update icebergConnection definition
IceS2 Jan 23, 2024
079e052
Update the iceberg souce code based on new schema
IceS2 Jan 25, 2024
c56232f
Updated icebergConnecgtion schema for simplicity and to be able to co…
IceS2 Jan 25, 2024
373aa39
Updated setup dependencies to be more flexible
IceS2 Jan 25, 2024
55b9b21
Merge branch 'main' into issue-5448-iceberg-connector
IceS2 Jan 25, 2024
b882f58
Updated get_owner_ref logic
IceS2 Jan 25, 2024
0563d78
Fix formatting
IceS2 Jan 25, 2024
54712a6
Changed the icebergConnection json schema structure to enable the Cla…
IceS2 Jan 25, 2024
e20b58e
Add the IcebergCatalog and IcebergFileSystem ClassConverters
IceS2 Jan 25, 2024
f157980
Refactor the code to take into account the new jsonSchema structure
IceS2 Jan 25, 2024
15d2e4d
Fix formatting
IceS2 Jan 25, 2024
7c1f4a2
Add Documentation for the Iceberg Connector
IceS2 Jan 25, 2024
2d6ebfc
Fix Menu order for Iceberg
IceS2 Jan 26, 2024
fb091ba
ui: add Iceberg service icon and constant
Sachin-chaurasiya Jan 26, 2024
af960ed
Fix DynamoDb Catalog issue due to how PyIceberg instantes it
IceS2 Jan 26, 2024
9c65960
Changed uri title to URI
IceS2 Jan 26, 2024
2a7df35
Fix ClassConverter for Iceberg
IceS2 Jan 26, 2024
915ec02
Fix GetSecretValue for password types
IceS2 Jan 26, 2024
2801dcf
Fix formatting
IceS2 Jan 26, 2024
79dc0dd
Fix formatting
IceS2 Jan 26, 2024
84ed783
Add Iceberg Connector Images for the docs
IceS2 Jan 26, 2024
dd1e6c1
Add pylint disable for Hacky super() call
IceS2 Jan 26, 2024
cd67f76
Add Iceberg.md for the UI docs
IceS2 Jan 26, 2024
0632639
Fix pylint complaint
IceS2 Jan 26, 2024
3a012bf
Fix pylint complaint
IceS2 Jan 26, 2024
2b1fec2
Fix UnitTests
IceS2 Jan 26, 2024
92dde13
fix type error and unit tests
Sachin-chaurasiya Jan 26, 2024
5787fdb
update pipeline type checks
Sachin-chaurasiya Jan 26, 2024
93a7f04
Fix Sonar Cloud complaints
IceS2 Jan 26, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 13 additions & 3 deletions ingestion/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,16 +20,19 @@
# Add here versions required for multiple plugins
VERSIONS = {
"airflow": "apache-airflow==2.7.3",
"adlfs": "adlfs==2022.11.2", # Python 3.7 does only support up to 2022.2.0
"avro": "avro~=1.11",
"boto3": "boto3>=1.20,<2.0", # No need to add botocore separately. It's a dep from boto3
"geoalchemy2": "GeoAlchemy2~=0.12",
"google-cloud-storage": "google-cloud-storage==1.43.0",
"gcsfs": "gcsfs==2022.11.0",
"great-expectations": "great-expectations~=0.18.0",
"grpc-tools": "grpcio-tools>=1.47.2",
"msal": "msal~=1.2",
"neo4j": "neo4j~=5.3.0",
"pandas": "pandas<=2,<3",
"pyarrow": "pyarrow~=14.0",
"pydantic": "pydantic==1.10.13",
"pydomo": "pydomo~=0.3",
"pymysql": "pymysql>=1.0.2",
"pyodbc": "pyodbc>=4.0.35,<5",
Expand Down Expand Up @@ -106,7 +109,7 @@
"jsonschema",
"memory-profiler",
"mypy_extensions>=0.4.3",
"pydantic~=1.10",
VERSIONS["pydantic"],
VERSIONS["pymysql"],
"python-dateutil>=2.8.1",
"python-jose~=3.3",
Expand Down Expand Up @@ -161,12 +164,12 @@
"datalake-azure": {
VERSIONS["azure-storage-blob"],
VERSIONS["azure-identity"],
"adlfs>=2022.2.0", # Python 3.7 does only support up to 2022.2.0
VERSIONS["adlfs"], # Python 3.7 does only support up to 2022.2.0
*COMMONS["datalake"],
},
"datalake-gcs": {
VERSIONS["google-cloud-storage"],
"gcsfs==2022.11.0",
VERSIONS["gcsfs"],
*COMMONS["datalake"],
},
"datalake-s3": {
Expand Down Expand Up @@ -197,6 +200,13 @@
"thrift-sasl~=0.4",
"impyla~=0.18.0",
},
"iceberg": {
"pyiceberg",
# Forcing the version of a few packages so it plays nicely with other requirements.
VERSIONS["pydantic"],
VERSIONS["adlfs"],
VERSIONS["gcsfs"],
},
"impala": {
"presto-types-parser>=0.0.2",
"impyla[kerberos]~=0.18.0",
Expand Down
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What do you think of making IcebergCatalogFactory either an instantiable class or just breaking it down with catalog_type_map being a global and from_connection being a simple function? I understand the intent but we don't really need that object if we won't use it as an object.

Do you see any advantage in that context of defining this piece of code directly in _init__.py?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hey, to be honest I don't have a really strong opinion... Been messing around with both ways actually xD

I believe that trying to wrap things with specific classes tends to make the code a bit more readable because it gives you context straight away, but if we avoid doing that in this project I wouldn't mind having just a from_connection function.

About defining it on init.py, for me it made sense as the IcebergCatalogFactory is basically the entrypoint for the sub module.

Again if it's against our common practices I wouldn't mind defining it on another file and to make the import statement more directy we can "re-export" it on the init.py.

Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
# Copyright 2021 Collate
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# http://www.apache.org/licenses/LICENSE-2.0
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""
Iceberg Catalog Factory.
"""
from typing import Dict, Type
from pyiceberg.catalog import Catalog

from metadata.generated.schema.entity.services.connections.database.icebergConnection import (
Catalog as IcebergCatalog,
HiveCatalogConnection,
RestCatalogConnection,
GlueCatalogConnection,
DynamoDbCatalogConnection,
)
from metadata.ingestion.source.database.iceberg.catalog.base import IcebergCatalogBase
from metadata.ingestion.source.database.iceberg.catalog.dynamodb import IcebergDynamoDbCatalog
from metadata.ingestion.source.database.iceberg.catalog.glue import IcebergGlueCatalog
from metadata.ingestion.source.database.iceberg.catalog.hive import IcebergHiveCatalog
from metadata.ingestion.source.database.iceberg.catalog.rest import IcebergRestCatalog



class IcebergCatalogFactory:
catalog_type_map: Dict[str, Type[IcebergCatalogBase]] = {
RestCatalogConnection.__name__: IcebergRestCatalog,
HiveCatalogConnection.__name__: IcebergHiveCatalog,
GlueCatalogConnection.__name__: IcebergGlueCatalog,
DynamoDbCatalogConnection.__name__: IcebergDynamoDbCatalog,
}

@classmethod
def from_connection(cls, catalog: IcebergCatalog) -> Catalog:
""" Returns a PyIceberg Catalog from the given catalog configuration. """
catalog_type = cls.catalog_type_map.get(catalog.type.__class__.__name__, None)

if not catalog_type:
raise NotImplementedError(f"Iceberg Catalog of type ['{catalog.type.__class__.__name__}'] Not Implemmented.")

return catalog_type.get_catalog(catalog)
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
# Copyright 2021 Collate
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# http://www.apache.org/licenses/LICENSE-2.0
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""
Iceberg Catalog base module.
"""

from abc import ABC, abstractmethod

from pyiceberg.catalog import Catalog

from metadata.generated.schema.entity.services.connections.database.icebergConnection import Catalog as IcebergCatalog
from metadata.ingestion.source.database.iceberg.fs import FileSystemConfig, IcebergFileSystemFactory

class IcebergCatalogBase(ABC):
@classmethod
@abstractmethod
def get_catalog(cls, catalog: IcebergCatalog) -> Catalog:
""" Returns a Catalog for given catalog configuration. """
pass

@staticmethod
def get_fs_parameters(fs_config: FileSystemConfig) -> dict:
""" Gets the FileSystem parameters based on passed configuration. """
return IcebergFileSystemFactory.parse(fs_config)
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
# Copyright 2021 Collate
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# http://www.apache.org/licenses/LICENSE-2.0
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""
Iceberg DynamoDB Catalog
"""
from pyiceberg.catalog import Catalog, load_dynamodb

from metadata.generated.schema.entity.services.connections.database.icebergConnection import (
Catalog as IcebergCatalog,
DynamoDbCatalogConnection,
)
from metadata.ingestion.source.database.iceberg.catalog.base import IcebergCatalogBase

class IcebergDynamoDbCatalog(IcebergCatalogBase):
@classmethod
def get_catalog(cls, catalog: IcebergCatalog) -> Catalog:
""" Returns a DynamoDB Catalog for the given connection and file storage.

For more information, check the PyIceberg [docs](https://py.iceberg.apache.org/configuration/#dynamodb-catalog)
"""
if not isinstance(catalog.type, DynamoDbCatalogConnection):
raise RuntimeError("'connection' is not an instance of 'DynamoDbCatalogConnection'")

parameters = {
**cls.get_fs_parameters(catalog.fileSystem),
"warehouse": catalog.warehouseLocation
}

if catalog.type.tableName:
parameters = {
"table-name": catalog.type.tableName
}

if catalog.type.awsConfig:
parameters = {
**parameters,
"aws_secret_key_id": catalog.type.awsConfig.awsAccessKeyId,
"aws_secret_access_key": catalog.type.awsConfig.awsSecretAccessKey,
"aws_session_token": catalog.type.awsConfig.awsSessionToken,
"region_name": catalog.type.awsConfig.awsRegion,
"profile_name": catalog.type.awsConfig.profileName,
}

return load_dynamodb(catalog.name, parameters)
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
# Copyright 2021 Collate
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# http://www.apache.org/licenses/LICENSE-2.0
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""
Iceberg Glue Catalog
"""
from pyiceberg.catalog import Catalog, load_glue

from metadata.generated.schema.entity.services.connections.database.icebergConnection import (
Catalog as IcebergCatalog,
GlueCatalogConnection,
)
from metadata.ingestion.source.database.iceberg.catalog.base import IcebergCatalogBase

class IcebergGlueCatalog(IcebergCatalogBase):
@classmethod
def get_catalog(cls, catalog: IcebergCatalog) -> Catalog:
""" Returns a Glue Catalog for the given connection and file storage.

For more information, check the PyIceberg [docs](https://py.iceberg.apache.org/configuration/#glue-catalog)
"""
if not isinstance(catalog.type, GlueCatalogConnection):
raise RuntimeError("'connection' is not an instance of 'GlueCatalogConnection'")

parameters = {
**cls.get_fs_parameters(catalog.fileSystem),
"warehouse": catalog.warehouseLocation
}

if catalog.type.awsConfig:
parameters = {
**parameters,
"aws_secret_key_id": catalog.type.awsConfig.awsAccessKeyId,
"aws_secret_access_key": catalog.type.awsConfig.awsSecretAccessKey,
"aws_session_token": catalog.type.awsConfig.awsSessionToken,
"region_name": catalog.type.awsConfig.awsRegion,
"profile_name": catalog.type.awsConfig.profileName,
}

return load_glue(catalog.name, parameters)
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
# Copyright 2021 Collate
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# http://www.apache.org/licenses/LICENSE-2.0
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""
Iceberg Hive Catalog
"""
from pyiceberg.catalog import Catalog, load_hive

from metadata.generated.schema.entity.services.connections.database.icebergConnection import (
Catalog as IcebergCatalog,
HiveCatalogConnection,
)
from metadata.ingestion.source.database.iceberg.catalog.base import IcebergCatalogBase

class IcebergHiveCatalog(IcebergCatalogBase):
@classmethod
def get_catalog(cls, catalog: IcebergCatalog) -> Catalog:
""" Returns a Hive Catalog for the given connection and file storage.

For more information, check the PyIceberg [docs](https://py.iceberg.apache.org/configuration/#hive-catalog)
"""
if not isinstance(catalog.type, HiveCatalogConnection):
raise RuntimeError("'connection' is not an instance of 'HiveCatalogConnection'")

parameters = {
**cls.get_fs_parameters(catalog.fileSystem),
"warehouse": catalog.warehouseLocation,
"uri": catalog.type.uri,
}

return load_hive(catalog.name, parameters)
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
# Copyright 2021 Collate
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# http://www.apache.org/licenses/LICENSE-2.0
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""
Iceberg Rest Catalog
"""
from pyiceberg.catalog import Catalog, load_rest

from metadata.generated.schema.entity.services.connections.database.icebergConnection import (
Catalog as IcebergCatalog,
RestCatalogConnection,
)
from metadata.ingestion.source.database.iceberg.catalog.base import IcebergCatalogBase

class IcebergRestCatalog(IcebergCatalogBase):
@classmethod
def get_catalog(cls, catalog: IcebergCatalog) -> Catalog:
""" Returns a Rest Catalog for the given connection and file storage.

For more information, check the PyIceberg [docs](https://py.iceberg.apache.org/configuration/#rest-catalog)
"""
if not isinstance(catalog.type, RestCatalogConnection):
raise RuntimeError("'connection' is not an instance of 'RestCatalogConnection'")

parameters = {
**cls.get_fs_parameters(catalog.fileSystem),
"warehouse": catalog.warehouseLocation,
"uri": catalog.type.uri,
"credential": catalog.type.credential,
"token": catalog.type.token,
}

if catalog.type.ssl:
parameters = {
**parameters,
"ssl": {
"client": {
"cert": catalog.type.ssl.clientCertPath,
"key": catalog.type.ssl.privateKeyPath
},
"cabundle": catalog.type.ssl.caCertPath
}
}

if catalog.type.sigv4:
parameters = {
**parameters,
"rest.sigv4": True,
"rest.signing_region": catalog.type.sigv4.signingRegion,
"rest.signing_name": catalog.type.sigv4.signingName
}
return load_rest(catalog.name, parameters)
Loading
Loading