From 31c74f652d17c1ea7b11c28ea12f65bc55f1898a Mon Sep 17 00:00:00 2001 From: Will Jones Date: Tue, 29 Nov 2022 22:53:51 -0800 Subject: [PATCH] test(python): add azure integration tests (#912) # Description Adding Azure integration tests to the Python bindings. ~~Will need to rebase after we merge #893.~~ # Related Issue(s) - fixes #955 # Documentation --- .github/workflows/python_build.yml | 2 +- python/deltalake/writer.py | 5 +- python/pyproject.toml | 1 + python/tests/conftest.py | 93 ++++++++++++++++++++++++++++++ python/tests/test_fs.py | 64 ++++++++++++++++++++ 5 files changed, 161 insertions(+), 4 deletions(-) diff --git a/.github/workflows/python_build.yml b/.github/workflows/python_build.yml index 11affcab31..f2f91a5bdd 100644 --- a/.github/workflows/python_build.yml +++ b/.github/workflows/python_build.yml @@ -107,7 +107,7 @@ jobs: - name: Run tests run: | source venv/bin/activate - python -m pytest -m 's3 and integration' + python -m pytest -m '((s3 or azure) and integration) or not integration' - name: Test without pandas run: | diff --git a/python/deltalake/writer.py b/python/deltalake/writer.py index 41de1961c6..f1419cbfd3 100644 --- a/python/deltalake/writer.py +++ b/python/deltalake/writer.py @@ -161,9 +161,8 @@ def write_deltalake( if filesystem is None: if table is not None: - storage_options = dict( - **(table._storage_options or {}), **(storage_options or {}) - ) + storage_options = table._storage_options or {} + storage_options.update(storage_options or {}) filesystem = pa_fs.PyFileSystem(DeltaStorageHandler(table_uri, storage_options)) diff --git a/python/pyproject.toml b/python/pyproject.toml index 7db7f0d750..9321c5e937 100644 --- a/python/pyproject.toml +++ b/python/pyproject.toml @@ -79,6 +79,7 @@ testpaths = [ markers = [ "integration: marks tests as integration tests (deselect with '-m \"not integration\"')", "s3: marks tests as integration tests with S3 (deselect with '-m \"not s3\"')", + "azure: marks tests as integration tests with Azure Blob Store", "pandas: marks tests that require pandas", "pyspark: marks tests that require pyspark", ] \ No newline at end of file diff --git a/python/tests/conftest.py b/python/tests/conftest.py index 2c332afdb9..92d4c9b43f 100644 --- a/python/tests/conftest.py +++ b/python/tests/conftest.py @@ -87,6 +87,99 @@ def s3_localstack(monkeypatch, s3_localstack_creds): monkeypatch.setenv(key, value) +@pytest.fixture(scope="session") +def azurite_creds(): + # These are the well-known values + # https://learn.microsoft.com/en-us/azure/storage/common/storage-use-azurite?tabs=visual-studio#well-known-storage-account-and-key + config = dict( + AZURE_STORAGE_ACCOUNT_NAME="devstoreaccount1", + AZURE_STORAGE_ACCOUNT_KEY="Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw==", + AZURE_STORAGE_CONTAINER_NAME="deltars", + AZURE_STORAGE_USE_EMULATOR="true", + AZURE_STORAGE_USE_HTTP="true", + ) + + endpoint_url = f"http://localhost:10000/{config['AZURE_STORAGE_ACCOUNT_NAME']}" + + env = os.environ.copy() + env.update(config) + env["AZURE_STORAGE_CONNECTION_STRING"] = ( + f"DefaultEndpointsProtocol=http;" + f"AccountName={config['AZURE_STORAGE_ACCOUNT_NAME']};" + f"AccountKey={config['AZURE_STORAGE_ACCOUNT_KEY']};" + f"BlobEndpoint={endpoint_url};" + ) + + try: + subprocess.run( + [ + "az", + "storage", + "container", + "create", + "--name", + config["AZURE_STORAGE_CONTAINER_NAME"], + ], + env=env, + ) + except OSError: + pytest.skip("azure cli is not installed") + + yield config + + subprocess.run( + [ + "az", + "storage", + "container", + "delete", + "--name", + config["AZURE_STORAGE_CONTAINER_NAME"], + ], + env=env, + ) + + +@pytest.fixture() +def azurite_env_vars(monkeypatch, azurite_creds): + for key, value in azurite_creds.items(): + monkeypatch.setenv(key, value) + + +@pytest.fixture() +def azurite_sas_creds(azurite_creds): + endpoint_url = ( + f"http://localhost:10000/{azurite_creds['AZURE_STORAGE_ACCOUNT_NAME']}" + ) + env = os.environ.copy() + env.update(azurite_creds) + env["AZURE_STORAGE_CONNECTION_STRING"] = ( + f"DefaultEndpointsProtocol=http;" + f"AccountName={azurite_creds['AZURE_STORAGE_ACCOUNT_NAME']};" + f"AccountKey={azurite_creds['AZURE_STORAGE_ACCOUNT_KEY']};" + f"BlobEndpoint={endpoint_url};" + ) + output = subprocess.run( + [ + "az", + "storage", + "container", + "generate-sas", + "--name", + azurite_creds["AZURE_STORAGE_CONTAINER_NAME"], + "--permissions", + "dlrw", + ], + env=env, + capture_output=True, + ) + + creds = {key: value for key, value in azurite_creds.items() if "KEY" not in key} + creds["SAS_TOKEN"] = output.stdout.decode() + + return creds + + @pytest.fixture() def sample_data(): nrows = 5 diff --git a/python/tests/test_fs.py b/python/tests/test_fs.py index 86c4c19593..4113b9a2a6 100644 --- a/python/tests/test_fs.py +++ b/python/tests/test_fs.py @@ -140,3 +140,67 @@ def test_roundtrip_s3_direct(s3_localstack_creds, sample_data: pa.Table): # assert dt.version() == 2 # table = dt.to_pyarrow_table() # assert table == sample_data + + +@pytest.mark.azure +@pytest.mark.integration +@pytest.mark.timeout(timeout=5, method="thread") +def test_roundtrip_azure_env(azurite_env_vars, sample_data: pa.Table): + table_path = "az://deltars/roundtrip" + + # Create new table with path + write_deltalake(table_path, sample_data) + dt = DeltaTable(table_path) + table = dt.to_pyarrow_table() + assert table == sample_data + assert dt.version() == 0 + + # Write with existing DeltaTable + write_deltalake(dt, sample_data, mode="overwrite") + dt.update_incremental() + assert dt.version() == 1 + + table = dt.to_pyarrow_table() + assert table == sample_data + + +@pytest.mark.azure +@pytest.mark.integration +@pytest.mark.timeout(timeout=5, method="thread") +def test_roundtrip_azure_direct(azurite_creds, sample_data: pa.Table): + table_path = "az://deltars/roundtrip2" + + # Fails without any creds + with pytest.raises(PyDeltaTableError): + anon_storage_options = { + key: value for key, value in azurite_creds.items() if "ACCOUNT" not in key + } + write_deltalake(table_path, sample_data, storage_options=anon_storage_options) + + # Can pass storage_options in directly + write_deltalake(table_path, sample_data, storage_options=azurite_creds) + dt = DeltaTable(table_path, storage_options=azurite_creds) + table = dt.to_pyarrow_table() + assert table == sample_data + assert dt.version() == 0 + + # Can pass storage_options into DeltaTable and then write + write_deltalake(dt, sample_data, mode="overwrite") + dt.update_incremental() + assert dt.version() == 1 + + table = dt.to_pyarrow_table() + assert table == sample_data + + +@pytest.mark.azure +@pytest.mark.integration +@pytest.mark.timeout(timeout=5, method="thread") +def test_roundtrip_azure_sas(azurite_sas_creds, sample_data: pa.Table): + table_path = "az://deltars/roundtrip3" + + write_deltalake(table_path, sample_data, storage_options=azurite_sas_creds) + dt = DeltaTable(table_path, storage_options=azurite_sas_creds) + table = dt.to_pyarrow_table() + assert table == sample_data + assert dt.version() == 0