diff --git a/CHANGELOG.md b/CHANGELOG.md index cafc51da..724e89b2 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -7,6 +7,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ## Unreleased +### Changed - [BREAKING] Dropped support for python version 3.7, as it is on EOL for over year. - [BREAKING] Aligned the Connection String Builder keywords with the rest of the SDKs. This means that some keywords were removed, and they will no longer be parsed as part of the Connection String. @@ -17,7 +18,12 @@ The following keywords have been removed: - `interactive_login` - `az_cli` +### Fixed +- Fixed issues with nested pandas dataframes ingestion. + ## [4.6.3] - 2025-01-08 + +### Fixed - Explicitly export members in `__init__.py` via `__all__` ## [4.6.2] - 2025-01-07 diff --git a/azure-kusto-ingest/azure/kusto/ingest/base_ingest_client.py b/azure-kusto-ingest/azure/kusto/ingest/base_ingest_client.py index ecd182f7..050c2bfe 100644 --- a/azure-kusto-ingest/azure/kusto/ingest/base_ingest_client.py +++ b/azure-kusto-ingest/azure/kusto/ingest/base_ingest_client.py @@ -1,10 +1,10 @@ +import gzip import ipaddress import os import tempfile import time import uuid from abc import ABCMeta, abstractmethod -from copy import copy from enum import Enum from io import TextIOWrapper from typing import TYPE_CHECKING, Union, IO, AnyStr, Optional, Tuple @@ -12,11 +12,9 @@ from azure.kusto.data.data_format import DataFormat from azure.kusto.data.exceptions import KustoClosedError - from .descriptors import FileDescriptor, StreamDescriptor from .ingestion_properties import IngestionProperties - if TYPE_CHECKING: import pandas @@ -117,12 +115,11 @@ def ingest_from_dataframe(self, df: "pandas.DataFrame", ingestion_properties: In if not isinstance(df, DataFrame): raise ValueError("Expected DataFrame instance, found {}".format(type(df))) - file_name = "df_{id}_{timestamp}_{uid}.csv.gz".format(id=id(df), timestamp=int(time.time()), uid=uuid.uuid4()) + file_name = "df_{id}_{timestamp}_{uid}.json.gz".format(id=id(df), timestamp=int(time.time()), uid=uuid.uuid4()) temp_file_path = os.path.join(tempfile.gettempdir(), file_name) - - df.to_csv(temp_file_path, index=False, encoding="utf-8", header=False, compression="gzip") - - ingestion_properties.format = DataFormat.CSV + with gzip.open(temp_file_path, "wb") as temp_file: + df.to_json(temp_file, orient="records", date_format="iso", lines=True) + ingestion_properties.format = DataFormat.JSON try: return self.ingest_from_file(temp_file_path, ingestion_properties) diff --git a/azure-kusto-ingest/tests/test_e2e_ingest.py b/azure-kusto-ingest/tests/test_e2e_ingest.py index c9eb743d..f1167946 100644 --- a/azure-kusto-ingest/tests/test_e2e_ingest.py +++ b/azure-kusto-ingest/tests/test_e2e_ingest.py @@ -529,15 +529,20 @@ async def test_streaming_ingest_from_dataframe(self): "xtextWithNulls", "xdynamicWithNulls", ] - rows = [ - [0, "00000000-0000-0000-0001-020304050607", 0.0, 0.0, 0, 0, 0, 0, 0, 0, 0, 0, "2014-01-01T01:01:01Z", "Zero", "Zero", "0", "00:00:00", None, ""] - ] + + guid = uuid.uuid4() + + dynamic_value = ["me@dummy.com", "you@dummy.com", "them@dummy.com"] + rows = [[0, str(guid), 0.0, 0.0, 0, 0, 0, 0, 0, 0, 0, 0, "2014-01-01T01:01:01Z", "Zero", "Zero", "0", "00:00:00", None, dynamic_value]] df = DataFrame(data=rows, columns=fields) ingestion_properties = IngestionProperties(database=self.test_db, table=self.test_table, flush_immediately=True, data_format=DataFormat.CSV) self.ingest_client.ingest_from_dataframe(df, ingestion_properties) await self.assert_rows_added(1, timeout=120) + a = self.client.execute(self.test_db, f"{self.test_table} | where rowguid == '{guid}'") + assert a.primary_results[0].rows[0]["xdynamicWithNulls"] == dynamic_value + @pytest.mark.asyncio async def test_streaming_ingest_from_blob(self, is_managed_streaming): ingestion_properties = IngestionProperties( diff --git a/azure-kusto-ingest/tests/test_kusto_ingest_client.py b/azure-kusto-ingest/tests/test_kusto_ingest_client.py index 3e7329fd..de046954 100644 --- a/azure-kusto-ingest/tests/test_kusto_ingest_client.py +++ b/azure-kusto-ingest/tests/test_kusto_ingest_client.py @@ -518,11 +518,11 @@ def test_simple_ingest_from_dataframe(self, mock_pid, mock_time, mock_uuid, mock result = ingest_client.ingest_from_dataframe(df, ingestion_properties=ingestion_properties) assert result.status == IngestionStatus.QUEUED - expected_url = "https://storageaccount.blob.core.windows.net/tempstorage/database__table__11111111-1111-1111-1111-111111111111__df_{0}_100_11111111-1111-1111-1111-111111111111.csv.gz?".format( + expected_url = "https://storageaccount.blob.core.windows.net/tempstorage/database__table__11111111-1111-1111-1111-111111111111__df_{0}_100_11111111-1111-1111-1111-111111111111.json.gz?".format( id(df) ) - assert_queued_upload(mock_put_message_in_queue, mock_upload_blob_from_stream, expected_url) + assert_queued_upload(mock_put_message_in_queue, mock_upload_blob_from_stream, expected_url, format="json") @responses.activate @patch("azure.kusto.ingest.managed_streaming_ingest_client.ManagedStreamingIngestClient.MAX_STREAMING_SIZE_IN_BYTES", new=0)