Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Change: Make ManagedStreamingQueuingPolicy internal, expose just a factor #413

Open
wants to merge 3 commits into
base: master
Choose a base branch
from

Conversation

ohadbitt
Copy link
Collaborator

Change:

  • Make ManagedStreamingQueuingPolicy internal, expose just a factor
  • Dont allow users to pass raw data, provide it only if we have it

Expose just a factor instead of the whole managed policy
@ohadbitt ohadbitt requested a review from AsafMah January 27, 2025 14:17
Copy link

github-actions bot commented Jan 28, 2025

Test Results

149 tests   - 183   149 ✅  - 174   6s ⏱️ - 3m 25s
 14 suites  -  13     0 💤  -   9 
 14 files    -  13     0 ❌ ±  0 

Results for commit 49888c1. ± Comparison against base commit 9884e0a.

This pull request removes 183 tests.
com.microsoft.azure.kusto.ingest.AzureStorageClientTest ‑ UploadStreamToBlob_NullInputStream_IllegalArgumentException
com.microsoft.azure.kusto.ingest.AzureStorageClientTest ‑ compressAndStream_NullBlob_IllegalArgumentException
com.microsoft.azure.kusto.ingest.AzureStorageClientTest ‑ compressAndStream_NullStream_IllegalArgumentException
com.microsoft.azure.kusto.ingest.AzureStorageClientTest ‑ compressAndUploadFileToBlob_NullBlob_IllegalArgumentException
com.microsoft.azure.kusto.ingest.AzureStorageClientTest ‑ compressAndUploadFileToBlob_NullFilePath_IllegalArgumentException
com.microsoft.azure.kusto.ingest.AzureStorageClientTest ‑ postMessageToQueue_NullContent_IllegalArgumentException
com.microsoft.azure.kusto.ingest.AzureStorageClientTest ‑ postMessageToQueue_NullEntity_IllegalArgumentException
com.microsoft.azure.kusto.ingest.AzureStorageClientTest ‑ postMessageToQueue_NullQueuePath_IllegalArgumentException
com.microsoft.azure.kusto.ingest.AzureStorageClientTest ‑ postMessageToQueue_NullTableUri_IllegalArgumentException
com.microsoft.azure.kusto.ingest.AzureStorageClientTest ‑ uploadFileToBlob_NullBlob_IllegalArgumentException
…

♻️ This comment has been updated with latest results.

/*
* For internal usage, adding blobExactSize
*/
public static BlobSourceInfo fromFile(String blobPath, FileSourceInfo fileSourceInfo, CompressionType sourceCompressionType, boolean gotCompressed) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If it's for internal use - can we lower the visibility?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

no because its used outside the pacakge

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, so maybe we want to document it like a public function?

Not sure what the right call is ehre

/*
* For internal usage, adding blobExactSize
*/
public static BlobSourceInfo fromFile(String blobPath, FileSourceInfo fileSourceInfo, CompressionType sourceCompressionType, boolean gotCompressed) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The param on the outside is "shouldCompress", but here it's called "gotCompressed" - any reason?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Client should care if it should compress
. file naming should care only that it was compressed .

@@ -23,14 +23,8 @@ public FileSourceInfo(String filePath) {
this.filePath = filePath;
}

// An estimation of the raw (uncompressed, un-indexed) size of the data, for binary formatted files - use only if known
public FileSourceInfo(String filePath, long rawSizeInBytes) {
public FileSourceInfo(String filePath, UUID sourceId) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In V2 SourceID is not UUID, so maybe we already want to treat it as a string?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

its not v2 here

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, but just not to break the interface when we move

@@ -39,22 +39,7 @@ public static StreamSourceInfo fileToStream(FileSourceInfo fileSourceInfo, boole
}

CompressionType compression = getCompression(filePath);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We no longer rely on format.isCompressible?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not here - in the policy

}
}

private void copyStream(InputStream inputStream, OutputStream outputStream, int bufferSize) throws IOException {
// Returns original stream size
private int copyStream(InputStream inputStream, OutputStream outputStream, int bufferSize) throws IOException {
byte[] buffer = new byte[bufferSize];
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think there is an helper in apache for copy stream, you should check.

* {@link ManagedStreamingQueuingPolicy.Default} which is created with no factor.
* @param factor - Default is 1.
**/
public void setQueuingPolicyFactor(double factor) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Update CHANGELOG.md with all the changes, mention this specifically as a breaking change

@@ -83,12 +84,13 @@ public static void postToQueueWithRetries(ResourceManager resourceManager, Azure
Collections.singletonMap("blob", SecurityUtils.removeSecretsFromUrl(blob.getBlobPath())));
}

public static String uploadStreamToBlobWithRetries(ResourceManager resourceManager, AzureStorageClient azureStorageClient, InputStream stream,
public static Pair<String, Integer> uploadStreamToBlobWithRetries(ResourceManager resourceManager, AzureStorageClient azureStorageClient,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

With the previous comment, maybe you should just have an UploadResult type with the string and size, it's clearer for both here and the inner methods that return int

@Nullable String clientRequestId)
throws IngestionClientException, IngestionServiceException {
String blobPath = blobSourceInfo.getBlobPath();
try {
// No need to check blob size if it was given to us that it's not empty
if (blobSourceInfo.getRawSizeInBytes() == 0 && cloudBlockBlob.getProperties().getBlobSize() == 0) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we check for empty blob elsewhere?

managedStreamingIngestClientSpy = spy(
new ManagedStreamingIngestClient(mock(StreamingIngestClient.class), queuedIngestClientMock, new ExponentialRetry(1)));
}
// @BeforeAll
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is everything commented out

Copy link
Contributor

@AsafMah AsafMah left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Some comments

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants