Skip to content

Commit

Permalink
escape in json (#99)
Browse files Browse the repository at this point in the history
* escape in json

* Update CountingCsvWriter.scala

* better design

* rename

* tests

*  fix test
  • Loading branch information
ohadbitt authored Dec 5, 2019
1 parent 9b9371f commit 4a51ed8
Show file tree
Hide file tree
Showing 5 changed files with 195 additions and 143 deletions.

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -194,7 +194,7 @@ object KustoWriter {
val writer = new OutputStreamWriter(gzip, StandardCharsets.UTF_8)

val buffer: BufferedWriter = new BufferedWriter(writer, GZIP_BUFFER_SIZE)
val csvWriter = CountingCsvWriter(buffer)
val csvWriter = CountingWriter(buffer)
BlobWriteResource(buffer, gzip, csvWriter, currentBlob, currentSas)
}

Expand Down Expand Up @@ -265,7 +265,7 @@ object KustoWriter {
blobWriteResource.gzip.close()
}

def writeRowAsCSV(row: InternalRow, schema: StructType, dateFormat: FastDateFormat, writer: CountingCsvWriter): Unit = {
def writeRowAsCSV(row: InternalRow, schema: StructType, dateFormat: FastDateFormat, writer: CountingWriter): Unit = {
val schemaFields: Array[StructField] = schema.fields

if (!row.isNullAt(0)) {
Expand All @@ -282,33 +282,36 @@ object KustoWriter {
writer.newLine()
}

def writeJsonField(json: String, writer: Writer, nested: Boolean): Unit = {
if (nested) {
writer.writeUnescaped(json)
} else {
writer.writeStringField(json)
}
}

// This method does not check for null at the current row idx and should be checked before !
private def writeField(row: SpecializedGetters, fieldIndexInRow: Int, dataType: DataType, dateFormat: FastDateFormat, csvWriter: CountingCsvWriter, nested: Boolean): Unit = {
private def writeField(row: SpecializedGetters, fieldIndexInRow: Int, dataType: DataType, dateFormat: FastDateFormat, writer: Writer, nested: Boolean): Unit = {
dataType match {
case StringType => GetStringFromUTF8(row.getUTF8String(fieldIndexInRow), nested, csvWriter)
case DateType => csvWriter.writeStringField(DateTimeUtils.toJavaDate(row.getInt(fieldIndexInRow)).toString, nested)
case TimestampType => csvWriter.writeStringField(dateFormat.format(DateTimeUtils.toJavaTimestamp(row.getLong(fieldIndexInRow))), nested)
case BooleanType => csvWriter.write(row.getBoolean(fieldIndexInRow).toString)
case structType: StructType => convertStructToCsv(row.getStruct(fieldIndexInRow, structType.length), structType, dateFormat, csvWriter, nested)
case arrType: ArrayType => convertArrayToCsv(row.getArray(fieldIndexInRow), arrType.elementType, dateFormat, csvWriter, nested)
case mapType: MapType => convertMapToCsv(row.getMap(fieldIndexInRow), mapType, dateFormat, csvWriter, nested)
case StringType => writeStringFromUTF8(row.getUTF8String(fieldIndexInRow), writer)
case DateType => writer.writeStringField(DateTimeUtils.toJavaDate(row.getInt(fieldIndexInRow)).toString)
case TimestampType => writer.writeStringField(dateFormat.format(DateTimeUtils.toJavaTimestamp(row.getLong(fieldIndexInRow))))
case BooleanType => writer.write(row.getBoolean(fieldIndexInRow).toString)
case structType: StructType => writeJsonField(convertStructToJson(row.getStruct(fieldIndexInRow, structType.length), structType, dateFormat), writer, nested)
case arrType: ArrayType => writeJsonField(convertArrayToJson(row.getArray(fieldIndexInRow), arrType.elementType, dateFormat), writer, nested)
case mapType: MapType => writeJsonField(convertMapToJson(row.getMap(fieldIndexInRow), mapType, dateFormat), writer, nested)
case ByteType | ShortType | IntegerType | LongType | FloatType | DoubleType =>
csvWriter.write(row.get(fieldIndexInRow, dataType).toString)
case _: DecimalType => csvWriter.write(row.get(fieldIndexInRow, dataType).toString)
case _ => csvWriter.writeStringField(row.get(fieldIndexInRow, dataType).toString, nested)
writer.write(row.get(fieldIndexInRow, dataType).toString)
case _: DecimalType => writer.write(row.get(fieldIndexInRow, dataType).toString)
case _ => writer.writeStringField(row.get(fieldIndexInRow, dataType).toString)
}
}

private def convertStructToCsv(row: InternalRow, schema: StructType, dateFormat: FastDateFormat, csvWriter: CountingCsvWriter, nested: Boolean): Unit = {

private def convertStructToJson(row: InternalRow, schema: StructType, dateFormat: FastDateFormat): String = {
val fields = schema.fields
if (fields.length != 0) {

if (!nested) {
csvWriter.write('"')
}

csvWriter.write('{')
val writer = EscapedWriter(new CharArrayWriter())
writer.write('{')

var x = 0
var isNull = true
Expand All @@ -322,54 +325,48 @@ object KustoWriter {

while (x < fields.length) {
if (!row.isNullAt(x)) {
csvWriter.write(',')
writer.write(',')
writeStructField(x)
}
x += 1
}
csvWriter.write('}')
writer.write('}')

if (!nested) {
csvWriter.write('"')
def writeStructField(idx: Int): Unit = {
writer.writeStringField(fields(idx).name)
writer.write(':')
writeField(row, idx, fields(idx).dataType, dateFormat, writer, nested = true)
}
}

def writeStructField(idx: Int): Unit = {
csvWriter.writeStringField(fields(idx).name, nested = true)
csvWriter.write(':')
writeField(row, idx, fields(idx).dataType, dateFormat, csvWriter, nested = true)
writer.out.toString
} else {
"{}"
}
}

private def convertArrayToCsv(ar: ArrayData, fieldsType: DataType, dateFormat: FastDateFormat, csvWriter: CountingCsvWriter, nested: Boolean): Unit = {
if (ar.numElements() == 0) csvWriter.write("[]") else {
if (!nested) {
csvWriter.write('"')
}
private def convertArrayToJson(ar: ArrayData, fieldsType: DataType, dateFormat: FastDateFormat): String = {
if (ar.numElements() == 0) "[]" else {
val writer = EscapedWriter(new CharArrayWriter())

csvWriter.write('[')
if (ar.isNullAt(0)) csvWriter.write("null") else writeField(ar, fieldIndexInRow = 0, fieldsType, dateFormat, csvWriter, nested = true)
writer.write('[')
if (ar.isNullAt(0)) writer.write("null") else writeField(ar, fieldIndexInRow = 0, fieldsType, dateFormat, writer, nested = true)
for (x <- 1 until ar.numElements()) {
csvWriter.write(',')
if (ar.isNullAt(x)) csvWriter.write("null") else writeField(ar, x, fieldsType, dateFormat, csvWriter, nested = true)
writer.write(',')
if (ar.isNullAt(x)) writer.write("null") else writeField(ar, x, fieldsType, dateFormat, writer, nested = true)
}
csvWriter.write(']')
writer.write(']')

if (!nested) {
csvWriter.write('"')
}
writer.out.toString
}
}

private def convertMapToCsv(map: MapData, fieldsType: MapType, dateFormat: FastDateFormat, csvWriter: CountingCsvWriter, nested: Boolean): Unit = {
private def convertMapToJson(map: MapData, fieldsType: MapType, dateFormat: FastDateFormat): String = {
val keys = map.keyArray()
val values = map.valueArray()

if (!nested) {
csvWriter.write('"')
}
val writer = EscapedWriter(new CharArrayWriter())

csvWriter.write('{')
writer.write('{')

var x = 0
var isNull = true
Expand All @@ -383,32 +380,28 @@ object KustoWriter {

while (x < map.keyArray().numElements()) {
if (!values.isNullAt(x)) {
csvWriter.write(',')
writer.write(',')
writeMapField(x)
}
x += 1
}

csvWriter.write('}')
if (!nested) {
csvWriter.write('"')
}
writer.write('}')

def writeMapField(idx: Int): Unit = {
csvWriter.write('"')
writeField(keys, fieldIndexInRow = idx, dataType = fieldsType.keyType, dateFormat = dateFormat, csvWriter = csvWriter, nested = false)
csvWriter.write('"')
csvWriter.write(':')
writeField(values, fieldIndexInRow = idx, dataType = fieldsType.valueType, dateFormat = dateFormat, csvWriter = csvWriter, nested = true)
writeField(keys, fieldIndexInRow = idx, dataType = fieldsType.keyType, dateFormat = dateFormat, writer, nested = true)
writer.write(':')
writeField(values, fieldIndexInRow = idx, dataType = fieldsType.valueType, dateFormat = dateFormat, writer = writer, nested = true)
}
writer.out.toString
}

private def GetStringFromUTF8(str: UTF8String, nested: Boolean, writer: CountingCsvWriter): Unit = {
writer.writeStringField(str.toString, nested)
private def writeStringFromUTF8(str: UTF8String, writer: Writer): Unit = {
writer.writeStringField(str.toString)
}
}

case class BlobWriteResource(writer: BufferedWriter, gzip: GZIPOutputStream, csvWriter: CountingCsvWriter, blob: CloudBlockBlob, sas: String)
case class BlobWriteResource(writer: BufferedWriter, gzip: GZIPOutputStream, csvWriter: CountingWriter, blob: CloudBlockBlob, sas: String)

case class KustoWriteResource(authentication: KustoAuthentication,
coordinates: KustoCoordinates,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
package com.microsoft.kusto.spark.datasink

trait Writer {
val out: java.io.Writer
def write(c: Char): Unit
def write(str: String): Unit
def writeStringField(str: String): Unit
def writeUnescaped(str: String): Unit = {
out.write(str)
}
}

case class CountingWriter(out: java.io.Writer) extends Writer {
val newLineSep: String = java.security.AccessController.doPrivileged(
new sun.security.action.GetPropertyAction("line.separator"))
val newLineSepLength: Int = newLineSep.length
var bytesCounter: Long = 0L

def newLine(): Unit = {
out.write(newLineSep)
bytesCounter += newLineSepLength
}

def write(c: Char): Unit ={
out.write(c)
bytesCounter += 1
}
def write(str: String): Unit = {
out.write(str)
bytesCounter += str.length
}

def writeStringField(str: String) {
if (str.length > 0) {
out.write('"')
bytesCounter += 2
for (c <- str) {
if (c == '"') {
out.write("\"\"")
bytesCounter += 1
} else {
out.write(c)
}
}

out.write('"')
bytesCounter += str.length
}
}

def getCounter: Long = bytesCounter

def resetCounter(): Unit = {
bytesCounter = 0
}
}

case class EscapedWriter(out: java.io.Writer) extends Writer {
def write(c: Char): Unit ={
out.write(c)
}

def write(str: String): Unit ={
for (c <- str) {
val escaped = EscapedWriter.escapeTable(c)
if (escaped != 0) {
out.write('\\')
out.write(escaped)
} else {
out.write(c)
}
}
}

def writeStringField(str: String): Unit = {
if (str.length > 0) {
out.write('"')
write(str)
out.write('"')
}
}
}

object EscapedWriter {
val escapeTable: Array[Int] = Array.fill[Int](128)(0)
escapeTable('"') = '"'
escapeTable('\\') = '\\'
escapeTable('\n') = 'n'
escapeTable('\r') = 'r'
escapeTable('\b') = 'b'
escapeTable('\t') = 't'
escapeTable('\f') = 'f'
}
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ class KustoClient(val clusterAlias: String, val engineKcsb: ConnectionStringBuil
import coordinates._

val mergeTask = Future {
KDSU.logInfo(myName, s"Polling on ingestion results, will merge data to destination table when finished")
KDSU.logInfo(myName, s"Polling on ingestion results, will move data to destination table when finished")

try {
partitionsResults.value.asScala.foreach {
Expand Down
Loading

0 comments on commit 4a51ed8

Please sign in to comment.