Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature/add kusto streaming feature #364

Merged
merged 35 commits into from
Jan 2, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
35 commits
Select commit Hold shift + click to select a range
919036b
Adding kusto stream ingestion
Apr 7, 2023
b60172e
Adding additional tests for stream ingestion
Apr 12, 2023
6163781
Updating comments and log messages
Apr 12, 2023
3c10d6c
Updating KustoSink.md doc
Apr 12, 2023
aab5d62
Chunking streaming requests into 4MB chunks
May 12, 2023
bd04b70
Clearing cache to support CreateIfNotExists for stream ingestion
May 12, 2023
a5f165b
Updating comments
May 12, 2023
bbd6cfa
Updated with recent commits
May 17, 2023
bb2ed23
Updating writeMode default to align with previous value
May 18, 2023
81c233f
* Reformat code and run tests
ag-ramachandran Feb 5, 2024
59c7a65
* Attempt to change streaming ingest to managed streaming ingest
ag-ramachandran Feb 5, 2024
218d237
Update docs/KustoSink.md
ag-ramachandran Feb 28, 2024
ed8b59c
* merge conflicts from master
ag-ramachandran Oct 11, 2024
be7a7f6
* Consolidate all commits for comparison
ag-ramachandran Mar 5, 2024
45b98a6
* Remove GZIP input
ag-ramachandran Mar 6, 2024
4a7c6a5
* Remove GZIP client
ag-ramachandran Mar 6, 2024
9d3a488
* Remove GZIP client
ag-ramachandran Mar 6, 2024
d413d9a
* Fix review comment on tags and createdDate not being supported in S…
ag-ramachandran Mar 7, 2024
0bee66e
stream nxt line
ohbitton Mar 11, 2024
214f8b5
Fix
ohbitton Mar 12, 2024
5036df9
* Make the streaming batch size configurable
ag-ramachandran Mar 19, 2024
549c63e
* Fix compilation
ag-ramachandran Jul 2, 2024
2f01be6
* Fix some more review comments
ag-ramachandran Jul 15, 2024
14dc056
* Fix some more review comments
ag-ramachandran Jul 15, 2024
cc7b207
* Add tests
ag-ramachandran Jul 15, 2024
d6bf015
* Fix review comments
ag-ramachandran Jul 15, 2024
a5b1b45
* Fix ingestion properties code
ag-ramachandran Jul 16, 2024
46245d2
* Update version
ag-ramachandran Oct 11, 2024
1d27797
* Update revision
ag-ramachandran Dec 12, 2024
0f1306d
* Fix Review comments
ag-ramachandran Dec 16, 2024
92ee97f
* Add documentation for Streaming ingest limitations
ag-ramachandran Dec 16, 2024
d396b5e
Update docs/KustoSink.md
ag-ramachandran Jan 1, 2025
5f98035
Update connector/src/main/scala/com/microsoft/kusto/spark/datasink/Ku…
ag-ramachandran Jan 1, 2025
675fd6b
Update docs/KustoSink.md
ag-ramachandran Jan 1, 2025
15222ef
Merge branch 'master' into feature/AddKustoStreamingFeature
ag-ramachandran Jan 1, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .github/workflows/release.yml
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,11 @@ jobs:
access_token=$(az account get-access-token --resource=${{ secrets.APP_ID }} --scope=${{ secrets.CLUSTER }}/.default --query accessToken -o tsv)
echo "ACCESS_TOKEN=$access_token" >> $GITHUB_ENV
- name: Run the Maven verify phase
run: mvn clean verify -DkustoAadAppId=${{ secrets.APP_ID }} -DkustoAadAuthorityID=${{ secrets.TENANT_ID }} -DkustoAadAppSecret=${{ secrets.APP_SECRET }} -DkustoDatabase=${{ secrets.DATABASE }} -DkustoCluster=${{ secrets.CLUSTER }}
env:
kustoAadAppId: ${{ secrets.APP_ID }}
kustoAadAuthorityID: ${{ secrets.TENANT_ID }}
kustoAadAppSecret: ${{ secrets.APP_SECRET }}
kustoDatabase: ${{ secrets.DATABASE }}
kustoCluster: ${{ secrets.CLUSTER }}
kustoAadAppId: ${{secrets.APP_ID}}
Expand Down
7 changes: 7 additions & 0 deletions connector/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -673,6 +673,13 @@
</transformationSets>
</configuration>
</plugin>
<plugin>
<groupId>org.antipathy</groupId>
<artifactId>mvn-scalafmt_${scala.version.major}</artifactId>
<configuration>
<configLocation>${project.parent.basedir}/.scalafmt.conf</configLocation> <!-- path to config -->
</configuration>
</plugin>
</plugins>
</build>
<repositories>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,15 +84,16 @@ case class KustoCoordinates(
ingestionUrl: Option[String] = None)

/**
* *******************************************************************************
* *********************************************************************************
*/
/* NOTE!!! */
/* These options are intended for testing, experimentation and debug. */
/* They may not be used in a production environment */
/* Interface stability is not guaranteed: options may be removed or changed freely */
// NOTE!!!
// These options are intended for testing, experimentation and debug.
// They may not be used in a production environment
// Interface stability is not guaranteed: options may be removed or changed freely
/**
* *******************************************************************************
* **********************************************************************************
*/

private[kusto] object KustoDebugOptions {
private val kustoOptionNames = collection.mutable.Set[String]()

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,7 @@ package com.microsoft.kusto.spark.datasink

import com.azure.data.tables.implementation.models.TableServiceErrorException
import com.fasterxml.jackson.databind.ObjectMapper

import java.util.concurrent.TimeUnit
import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule
import com.microsoft.azure.kusto.data.ClientRequestProperties
import com.microsoft.azure.kusto.ingest.result.{IngestionStatus, OperationStatus}
import com.microsoft.kusto.spark.authentication.KustoAuthentication
Expand All @@ -20,20 +19,20 @@ import com.microsoft.kusto.spark.utils.KustoConstants.IngestSkippedTrace
import com.microsoft.kusto.spark.utils.{
ExtendedKustoClient,
KustoClientCache,
KustoConstants,
KustoDataSourceUtils => KDSU
}
import org.apache.spark.SparkContext
import org.apache.spark.util.CollectionAccumulator

import java.time.Instant
import java.util.concurrent.TimeUnit
import scala.collection.JavaConverters._
import scala.concurrent.{Await, Future, TimeoutException}
import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.{Await, Future, TimeoutException}

object FinalizeHelper {
private val myName = this.getClass.getSimpleName

private val mapper = new ObjectMapper().registerModule(new JavaTimeModule())
private[kusto] def finalizeIngestionWhenWorkersSucceeded(
coordinates: KustoCoordinates,
batchIdIfExists: String,
Expand All @@ -48,19 +47,18 @@ object FinalizeHelper {
sinkStartTime: Instant): Unit = {
if (!kustoClient.shouldIngestData(
coordinates,
writeOptions.ingestionProperties,
writeOptions.maybeSparkIngestionProperties,
tableExists,
crp)) {
KDSU.logInfo(myName, s"$IngestSkippedTrace '${coordinates.table}'")
} else {
val mergeTask = Future {
val loggerName = myName
val requestId = writeOptions.requestId
val ingestionInfoString =
s"RequestId: $requestId cluster: '${coordinates.clusterAlias}', " +
s"database: '${coordinates.database}', table: '$tmpTableName' $batchIdIfExists"
KDSU.logInfo(
loggerName,
myName,
s"Polling on ingestion results for requestId: $requestId, will move data to " +
s"destination table when finished")

Expand All @@ -69,7 +67,6 @@ object FinalizeHelper {
partitionsResults.value.asScala.foreach(partitionResult =>
pollOnResult(
partitionResult,
loggerName,
requestId,
writeOptions.timeout.toMillis,
ingestionInfoString,
Expand All @@ -86,7 +83,6 @@ object FinalizeHelper {
results.foreach(partitionResult =>
pollOnResult(
partitionResult,
loggerName,
requestId,
writeOptions.timeout.toMillis,
ingestionInfoString,
Expand Down Expand Up @@ -188,7 +184,6 @@ object FinalizeHelper {

def pollOnResult(
partitionResult: PartitionResult,
loggerName: String,
requestId: String,
timeout: Long,
ingestionInfoString: String,
Expand All @@ -201,20 +196,16 @@ object FinalizeHelper {
finalRes = Some(partitionResult.ingestionResult.getIngestionStatusCollection.get(0))
finalRes
} catch {
// case e: RequestFailedException =>
// KDSU.logWarn(loggerName, "Failed to fetch operation status transiently - will keep polling. " +
// s"RequestId: $requestId. Error: ${ExceptionUtils.getStackTrace(e)}")
// None
case e: TableServiceErrorException =>
KDSU.reportExceptionAndThrow(
loggerName,
myName,
e,
s"TableServiceErrorException : RequestId: $requestId",
shouldNotThrow = true)
None
case e: Exception =>
KDSU.reportExceptionAndThrow(
loggerName,
myName,
e,
s"Failed to fetch operation status. RequestId: $requestId")
None
Expand All @@ -226,7 +217,7 @@ object FinalizeHelper {
val pending = res.isDefined && res.get.status == OperationStatus.Pending
if (pending) {
KDSU.logDebug(
loggerName,
myName,
s"Polling on result for partition: '${partitionResult.partitionId}' in requestId: $requestId, status is-'Pending'")
}
pending
Expand All @@ -235,48 +226,58 @@ object FinalizeHelper {
maxWaitTimeBetweenCallsMillis = KDSU.WriteInitialMaxWaitTime.toMillis.toInt,
maxWaitTimeAfterMinute = KDSU.WriteMaxWaitTime.toMillis.toInt)
.await(timeout, TimeUnit.MILLISECONDS)
finalRes match {
case Some(ingestResults) =>
processIngestionStatusResults(
partitionResult.partitionId,
ingestionInfoString,
shouldThrowOnTagsAlreadyExists,
ingestResults)
case None => throw new RuntimeException("Failed to poll on ingestion status.")
}
}

if (finalRes.isDefined) {
finalRes.get.status match {
case OperationStatus.Pending =>
def processIngestionStatusResults(
partitionId: Int = 0,
ingestionInfoString: String,
shouldThrowOnTagsAlreadyExists: Boolean,
ingestionStatusResult: IngestionStatus): Unit = {
ingestionStatusResult.status match {
case OperationStatus.Pending =>
throw new RuntimeException(
s"Ingestion to Kusto failed on timeout failure. $ingestionInfoString, partition: '$partitionId'")
case OperationStatus.Succeeded =>
KDSU.logInfo(
myName,
s"Ingestion to Kusto succeeded. $ingestionInfoString, partition: '$partitionId', " +
s"from: '${ingestionStatusResult.ingestionSourcePath}' , Operation ${ingestionStatusResult.operationId}")
case OperationStatus.Skipped =>
// TODO: should we throw ?
KDSU.logInfo(
myName,
s"Ingestion to Kusto skipped. $ingestionInfoString, " +
s"partition: '$partitionId', from: '${ingestionStatusResult.ingestionSourcePath}', " +
s"Operation ${ingestionStatusResult.operationId}")
case otherStatus: Any =>
// TODO error code should be added to java client
if (ingestionStatusResult.errorCodeString != "Skipped_IngestByTagAlreadyExists") {
throw new RuntimeException(
s"Ingestion to Kusto failed on timeout failure. $ingestionInfoString, " +
s"partition: '${partitionResult.partitionId}'")
case OperationStatus.Succeeded =>
KDSU.logInfo(
loggerName,
s"Ingestion to Kusto succeeded. $ingestionInfoString, " +
s"partition: '${partitionResult.partitionId}', from: '${finalRes.get.ingestionSourcePath}', " +
s"Operation ${finalRes.get.operationId}")
case OperationStatus.Skipped =>
// TODO: should we throw ?
KDSU.logInfo(
loggerName,
s"Ingestion to Kusto skipped. $ingestionInfoString, " +
s"partition: '${partitionResult.partitionId}', from: '${finalRes.get.ingestionSourcePath}', " +
s"Operation ${finalRes.get.operationId}")
case otherStatus: Any =>
// TODO error code should be added to java client
if (finalRes.get.errorCodeString != "Skipped_IngestByTagAlreadyExists") {
throw new RuntimeException(s"Ingestion to Kusto failed with status '$otherStatus'." +
s" $ingestionInfoString, partition: '${partitionResult.partitionId}'. Ingestion info: '${new ObjectMapper().writerWithDefaultPrettyPrinter
.writeValueAsString(finalRes.get)}'")
} else if (shouldThrowOnTagsAlreadyExists) {
// TODO - think about this logic and other cases that should not throw all (maybe everything that starts with skip? this actualy
// seems like a bug in engine that the operation status is not Skipped)
// (Skipped_IngestByTagAlreadyExists is relevant for dedup flow only as in other cases we cancel the ingestion altogether)
throw new RuntimeException(s"Ingestion to Kusto skipped with status '$otherStatus'." +
s" $ingestionInfoString, partition: '${partitionResult.partitionId}'. Ingestion info: '${new ObjectMapper().writerWithDefaultPrettyPrinter
.writeValueAsString(finalRes.get)}'")
}
KDSU.logInfo(
loggerName,
s"Ingestion to Kusto failed. $ingestionInfoString, " +
s"partition: '${partitionResult.partitionId}', from: '${finalRes.get.ingestionSourcePath}', " +
s"Operation ${finalRes.get.operationId}")
}
} else {
throw new RuntimeException("Failed to poll on ingestion status.")
s"Ingestion to Kusto failed with status '$otherStatus'." +
s" $ingestionInfoString, partition: '$partitionId'. Ingestion info: '${mapper.writerWithDefaultPrettyPrinter
.writeValueAsString(ingestionStatusResult)}'")
} else if (shouldThrowOnTagsAlreadyExists) {
// TODO - think about this logic and other cases that should not throw all (maybe everything that starts with skip? this actualy
// seems like a bug in engine that the operation status is not Skipped)
// (Skipped_IngestByTagAlreadyExists is relevant for dedup flow only as in other cases we cancel the ingestion altogether)
throw new RuntimeException(s"Ingestion to Kusto skipped with status '$otherStatus'." +
s" $ingestionInfoString, partition: '$partitionId'. Ingestion info: '${new ObjectMapper().writerWithDefaultPrettyPrinter
.writeValueAsString(ingestionStatusResult)}'")
}
KDSU.logInfo(
myName,
s"Ingestion to Kusto failed. $ingestionInfoString, " +
s"partition: '$partitionId', from: '${ingestionStatusResult.ingestionSourcePath}', " +
s"Operation ${ingestionStatusResult.operationId}")
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,11 @@

package com.microsoft.kusto.spark.datasink

import java.util.UUID
import java.util.concurrent.TimeUnit

import com.microsoft.kusto.spark.common.KustoOptions
import com.microsoft.kusto.spark.utils.KustoConstants

import java.util.UUID
import java.util.concurrent.TimeUnit
import scala.concurrent.duration.FiniteDuration

object KustoSinkOptions extends KustoOptions {
Expand Down Expand Up @@ -62,6 +61,9 @@ object KustoSinkOptions extends KustoOptions {
// after which it will move the data to the destination table (the last part is a metadata operation only)
// If set to 'Queued', the write operation finishes after data is processed by the workers, the data may not be completely
// available up until the service finishes loading it and failures on the service side will not propagate to Spark.
// If set to 'KustoStreaming', Kusto streaming ingestion will be used. Streaming ingestion should be used if latency of less than a few seconds is required
// or To optimize operational processing of many tables where the stream of data into each table is relatively small (a few records per second).
// If a batch exceeds 4 MB, it will be broken into multiple appropriately sized chunks.
val KUSTO_WRITE_MODE: String = newOption("writeMode")

// Provide a temporary table name that will be used for this write operation to achieve transactional write and move
Expand All @@ -72,6 +74,10 @@ object KustoSinkOptions extends KustoOptions {
// https://docs.microsoft.com/azure/data-explorer/kusto/management/auto-delete-policy
// Use this option if you want to persist partial write results (as the failure could be of a single partition)
val KUSTO_TEMP_TABLE_NAME: String = newOption("tempTableName")

// The chunk size that we want to use while iterating over "streaming batch". The default is 4MB.
// Every streaming ingest will be sent in chunks of this size.
val KUSTO_STREAMING_INGEST_SIZE_IN_MB: String = newOption("streamingIngestSizeInMB")
}

object SinkTableCreationMode extends Enumeration {
Expand All @@ -86,28 +92,29 @@ object SchemaAdjustmentMode extends Enumeration {

object WriteMode extends Enumeration {
type WriteMode = Value
val Transactional, Queued = Value
val Transactional, Queued, KustoStreaming = Value
}

case class WriteOptions(
pollingOnDriver: Boolean = false,
tableCreateOptions: SinkTableCreationMode.SinkTableCreationMode =
pollingOnDriver: Boolean = false,
tableCreateOptions: SinkTableCreationMode.SinkTableCreationMode =
SinkTableCreationMode.FailIfNotExist,
isAsync: Boolean = false,
writeResultLimit: String = KustoSinkOptions.NONE_RESULT_LIMIT,
timeZone: String = "UTC",
timeout: FiniteDuration = new FiniteDuration(
isAsync: Boolean = false,
writeResultLimit: String = KustoSinkOptions.NONE_RESULT_LIMIT,
timeZone: String = "UTC",
timeout: FiniteDuration = new FiniteDuration(
KustoConstants.DefaultWaitingIntervalLongRunning.toInt,
TimeUnit.SECONDS),
ingestionProperties: Option[String] = None,
batchLimit: Int = KustoConstants.DefaultBatchingLimit,
requestId: String = UUID.randomUUID().toString,
autoCleanupTime: FiniteDuration =
maybeSparkIngestionProperties: Option[SparkIngestionProperties] = None,
batchLimit: Int = KustoConstants.DefaultBatchingLimit,
requestId: String = UUID.randomUUID().toString,
autoCleanupTime: FiniteDuration =
new FiniteDuration(KustoConstants.DefaultCleaningInterval.toInt, TimeUnit.SECONDS),
maxRetriesOnMoveExtents: Int = 10,
minimalExtentsCountForSplitMerge: Int = 400,
adjustSchema: SchemaAdjustmentMode.SchemaAdjustmentMode = SchemaAdjustmentMode.NoAdjustment,
isTransactionalMode: Boolean = true,
userTempTableName: Option[String] = None,
disableFlushImmediately: Boolean = false,
ensureNoDupBlobs: Boolean = false)
maxRetriesOnMoveExtents: Int = 10,
minimalExtentsCountForSplitMerge: Int = 400,
adjustSchema: SchemaAdjustmentMode.SchemaAdjustmentMode = SchemaAdjustmentMode.NoAdjustment,
writeMode: WriteMode.WriteMode = WriteMode.Transactional,
userTempTableName: Option[String] = None,
disableFlushImmediately: Boolean = false,
ensureNoDupBlobs: Boolean = false,
streamIngestUncompressedMaxSize: Int = KustoConstants.DefaultMaxStreamingBytesUncompressed)
Loading
Loading