diff --git a/connector/src/main/scala/com/microsoft/kusto/spark/datasink/KustoWriter.scala b/connector/src/main/scala/com/microsoft/kusto/spark/datasink/KustoWriter.scala index f7f01f5d..bd9cd511 100644 --- a/connector/src/main/scala/com/microsoft/kusto/spark/datasink/KustoWriter.scala +++ b/connector/src/main/scala/com/microsoft/kusto/spark/datasink/KustoWriter.scala @@ -47,6 +47,7 @@ import scala.concurrent.ExecutionContext.Implicits.global import scala.util.{Failure, Success, Try} import java.time.ZoneId import java.time.format.DateTimeFormatter +import java.util.concurrent.ConcurrentHashMap object KustoWriter { private val className = this.getClass.getSimpleName @@ -392,6 +393,8 @@ object KustoWriter { batchIdForTracing: String): Unit = { val partitionId = TaskContext.getPartitionId val partitionIdString = TaskContext.getPartitionId.toString + val taskMap = new ConcurrentHashMap[String, BlobWriteResource]() + def ingest( blobResource: BlobWriteResource, size: Long, @@ -486,26 +489,30 @@ object KustoWriter { if (shouldNotCommitBlockBlob) { blobWriter } else { - KDSU.logInfo( - className, - s"Sealing blob in partition $partitionIdString for requestId: '${parameters.writeOptions.requestId}', " + - s"blob number ${row._2}, with size $count") - finalizeBlobWrite(blobWriter) - ingest( - blobWriter, - blobWriter.csvWriter.getCounter, - blobWriter.sas, - flushImmediately = !parameters.writeOptions.disableFlushImmediately, - curBlobUUID, - kustoClient) - curBlobUUID = UUID.randomUUID().toString - createBlobWriter( - parameters.coordinates, - parameters.tmpTableName, - kustoClient, - partitionIdString, - row._2, - curBlobUUID) + if (parameters.writeOptions.ensureNoDupBlobs) { + taskMap.put(curBlobUUID, blobWriter) + } else { + KDSU.logInfo( + className, + s"Sealing blob in partition $partitionIdString for requestId: '${parameters.writeOptions.requestId}', " + + s"blob number ${row._2}, with size $count") + finalizeBlobWrite(blobWriter) + ingest( + blobWriter, + blobWriter.csvWriter.getCounter, + blobWriter.sas, + flushImmediately = !parameters.writeOptions.disableFlushImmediately, + curBlobUUID, + kustoClient) + curBlobUUID = UUID.randomUUID().toString + createBlobWriter( + parameters.coordinates, + parameters.tmpTableName, + kustoClient, + partitionIdString, + row._2, + curBlobUUID) + } } } @@ -515,13 +522,22 @@ object KustoWriter { s"requestId: '${parameters.writeOptions.requestId}' ") finalizeBlobWrite(lastBlobWriter) if (lastBlobWriter.csvWriter.getCounter > 0) { - ingest( - lastBlobWriter, - lastBlobWriter.csvWriter.getCounter, - lastBlobWriter.sas, - flushImmediately = false, - curBlobUUID, - kustoClient) + if (parameters.writeOptions.ensureNoDupBlobs) { + taskMap.put(curBlobUUID, lastBlobWriter) + } else { + ingest( + lastBlobWriter, + lastBlobWriter.csvWriter.getCounter, + lastBlobWriter.sas, + flushImmediately = false, + curBlobUUID, + kustoClient) + } + } + if (parameters.writeOptions.ensureNoDupBlobs && taskMap.size() > 0) { + taskMap.forEach((uuid, bw) => { + ingest(bw, bw.csvWriter.getCounter, bw.sas, flushImmediately = false, uuid, kustoClient) + }) } } diff --git a/pom.xml b/pom.xml index f4c3d4cd..8aa8bde4 100644 --- a/pom.xml +++ b/pom.xml @@ -8,7 +8,7 @@ pom ${revision} - 5.2.2 + 5.2.3 2.12 1.1.1640084764.9f463a9