Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support DBT Manifests from Snowflake Stage #116

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 14 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,20 @@ manifests:
object_name: <YOUR OBJECT NAME> # The object name of your manifest file.
```

#### Using Snowflake Stage as an artifact source

You can use dbt-loom to fetch manifest files from Snowflake Stage by setting up an `snowflake` manifest in your `dbt-loom` config.


```yaml
manifests:
- name: project_name
type: snowflake
config:
stage: stage_name # Stage name, can include Database/Schema
stage_path: path/to/dbt/manifest.json # Path to manifest file in the stage
```

### Using environment variables

You can easily incorporate your own environment variables into the config file. This allows for dynamic configuration values that can change based on the environment. To specify an environment variable in the `dbt-loom` config file, use one of the following formats:
Expand Down
63 changes: 63 additions & 0 deletions dbt_loom/clients/snowflake_stage.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
import gzip
import json
import tempfile
from pathlib import PurePath, Path, PosixPath, PurePosixPath
from typing import Dict

from dbt.config.runtime import load_profile
from dbt.flags import get_flags
from dbt.mp_context import get_mp_context
from pydantic import BaseModel


class SnowflakeReferenceConfig(BaseModel):
"""Configuration for an reference stored in Snowflake Stage"""

stage: str
stage_path: str


class SnowflakeClient:
"""A client for loading manifest files from Snowflake Stage."""

def __init__(self, stage: str, stage_path: str) -> None:
self.stage = stage
self.stage_path = stage_path

def load_manifest(self) -> Dict:
"""Load the manifest.json file from Snowflake stage."""

# Import locally to not require dbt-snowflake to be installed
from dbt.adapters.snowflake import SnowflakeAdapter

flags = get_flags()
profile = load_profile(
project_root=flags.PROJECT_DIR,
cli_vars=flags.VARS,
profile_name_override=flags.PROFILE,
target_override=flags.TARGET,
)
adapter = SnowflakeAdapter(profile, get_mp_context())
file_name = str(PurePosixPath(self.stage_path).name)
tmp_dir = tempfile.mkdtemp(prefix="dbt_loom_")
# Snowflake needs '/' path separators
tmp_dir_sf = tmp_dir.replace("\\", "/")

with adapter.connection_named("dbt-loom"):
get_query = f"get @{self.stage}/{self.stage_path} file://{tmp_dir_sf}/"
response, table = adapter.connections.execute(get_query)
if response.rows_affected == 0:
raise Exception(
f"Failed to get file {self.stage}/{self.stage_path}: {response}"
)

download_path = Path(tmp_dir) / file_name

if download_path.name.endswith(".gz"):
with gzip.GzipFile(download_path) as gzip_file:
content = gzip_file.read().decode("utf-8")
else:
with download_path.open("r") as f:
content = f.read()

return json.loads(content)
3 changes: 3 additions & 0 deletions dbt_loom/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
from dbt_loom.clients.dbt_cloud import DbtCloudReferenceConfig
from dbt_loom.clients.gcs import GCSReferenceConfig
from dbt_loom.clients.s3 import S3ReferenceConfig
from dbt_loom.clients.snowflake_stage import SnowflakeReferenceConfig


class ManifestReferenceType(str, Enum):
Expand All @@ -20,6 +21,7 @@ class ManifestReferenceType(str, Enum):
gcs = "gcs"
s3 = "s3"
azure = "azure"
snowflake = "snowflake"


class FileReferenceConfig(BaseModel):
Expand Down Expand Up @@ -54,6 +56,7 @@ class ManifestReference(BaseModel):
GCSReferenceConfig,
S3ReferenceConfig,
AzureReferenceConfig,
SnowflakeReferenceConfig,
]
excluded_packages: List[str] = Field(default_factory=list)

Expand Down
12 changes: 12 additions & 0 deletions dbt_loom/manifests.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@
from pydantic import BaseModel, Field, validator
import requests

from dbt_loom.clients.snowflake_stage import SnowflakeReferenceConfig, SnowflakeClient

try:
from dbt.artifacts.resources.types import NodeType
except ModuleNotFoundError:
Expand Down Expand Up @@ -105,6 +107,7 @@ def __init__(self):
ManifestReferenceType.gcs: self.load_from_gcs,
ManifestReferenceType.s3: self.load_from_s3,
ManifestReferenceType.azure: self.load_from_azure,
ManifestReferenceType.snowflake: self.load_from_snowflake,
}

@staticmethod
Expand Down Expand Up @@ -212,6 +215,15 @@ def load_from_azure(config: AzureReferenceConfig) -> Dict:

return azure_client.load_manifest()

@staticmethod
def load_from_snowflake(config: SnowflakeReferenceConfig) -> Dict:
"""Load a manifest dictionary from Snowflake stage."""
snowflake_client = SnowflakeClient(
stage=config.stage, stage_path=config.stage_path
)

return snowflake_client.load_manifest()

def load(self, manifest_reference: ManifestReference) -> Dict:
"""Load a manifest dictionary based on a ManifestReference input."""

Expand Down
Loading