Skip to content

Commit

Permalink
[Kafka.DotNet.ksqlDB] - TimeStampToString
Browse files Browse the repository at this point in the history
  • Loading branch information
tomasfabian committed Feb 17, 2021
1 parent 2910a06 commit 11ae6d3
Show file tree
Hide file tree
Showing 5 changed files with 101 additions and 3 deletions.
2 changes: 1 addition & 1 deletion Joker.Kafka/ChangeLog.md
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@
- ```KSqldbProvider<T>``` - ksqldb REST api provider for push queries (```KSqlDbQueryProvider<T>```, ```KSqlDbQueryStreamProvider<T>```)

### v0.4.0 (not released):

- TIMESTAMPTOSTRING

# TODO:
- https://docs.ksqldb.io/en/latest/developer-guide/ksqldb-reference/scalar-functions/#date-and-time
Expand Down
37 changes: 36 additions & 1 deletion Joker.Kafka/KSql/Query/Functions/KSqlFunctionsExtensions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,16 @@ public static decimal Abs(this KSqlFunctions kSqlFunctions, decimal input)

#region Ceil

public static int Ceil(this KSqlFunctions kSqlFunctions, int input)
{
throw new InvalidOperationException(ServerSideOperationErrorMessage);
}

public static long Ceil(this KSqlFunctions kSqlFunctions, long input)
{
throw new InvalidOperationException(ServerSideOperationErrorMessage);
}

public static float Ceil(this KSqlFunctions kSqlFunctions, float input)
{
throw new InvalidOperationException(ServerSideOperationErrorMessage);
Expand All @@ -63,6 +73,16 @@ public static decimal Ceil(this KSqlFunctions kSqlFunctions, decimal input)

#region Floor

public static int Floor(this KSqlFunctions kSqlFunctions, int input)
{
throw new InvalidOperationException(ServerSideOperationErrorMessage);
}

public static long Floor(this KSqlFunctions kSqlFunctions, long input)
{
throw new InvalidOperationException(ServerSideOperationErrorMessage);
}

public static float Floor(this KSqlFunctions kSqlFunctions, float input)
{
throw new InvalidOperationException(ServerSideOperationErrorMessage);
Expand All @@ -82,6 +102,7 @@ public static decimal Floor(this KSqlFunctions kSqlFunctions, decimal input)

#region Round

//TODO:returns BIGINT
public static float Round(this KSqlFunctions kSqlFunctions, float input)
{
throw new InvalidOperationException(ServerSideOperationErrorMessage);
Expand Down Expand Up @@ -113,12 +134,16 @@ public static decimal Round(this KSqlFunctions kSqlFunctions, decimal input, int
}

#endregion


#region Random

public static double Random(this KSqlFunctions kSqlFunctions)
{
throw new InvalidOperationException(ServerSideOperationErrorMessage);
}

#endregion

#region Sign

public static int Sign(this KSqlFunctions kSqlFunctions, short input)
Expand Down Expand Up @@ -201,6 +226,16 @@ public static string DateToString(this KSqlFunctions kSqlFunctions, int epochDay
throw new InvalidOperationException(ServerSideOperationErrorMessage);
}

public static string TimeStampToString(this KSqlFunctions kSqlFunctions, long epochMilli, string formatPattern)
{
throw new InvalidOperationException(ServerSideOperationErrorMessage);
}

public static string TimeStampToString(this KSqlFunctions kSqlFunctions, long epochMilli, string formatPattern, string timeZone)
{
throw new InvalidOperationException(ServerSideOperationErrorMessage);
}

#endregion
}
}
1 change: 1 addition & 0 deletions Joker.Kafka/KSql/Query/Visitors/KSqlFunctionVisitor.cs
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ protected override Expression VisitMethodCall(MethodCallExpression methodCallExp
case nameof(KSqlFunctionsExtensions.RPad):
case nameof(KSqlFunctionsExtensions.Substring):
case nameof(KSqlFunctionsExtensions.DateToString):
case nameof(KSqlFunctionsExtensions.TimeStampToString):
Append($"{methodInfo.Name.ToUpper()}");
PrintFunctionArguments(methodCallExpression.Arguments.Skip(1));
break;
Expand Down
18 changes: 17 additions & 1 deletion Joker.Kafka/Wiki.md
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,7 @@ Omitting select is equivalent to SELECT *
| DOUBLE | double |
| BOOLEAN | bool |
| ```ARRAY<ElementType>``` | C#Type[] |
| ```MAP<ElementType, ElementType>``` | IDictionary<C#Type, C#Type> |

Array type mapping example (available from v0.3.0):
```
Expand Down Expand Up @@ -388,7 +389,7 @@ UCASE(Latitude) != 'HI'

# v0.2.0
```
Install-Package Kafka.DotNet.ksqlDB -Version 0.2.0-RC1
Install-Package Kafka.DotNet.ksqlDB -Version 0.2.0
```

### Having (v0.2.0)
Expand Down Expand Up @@ -786,6 +787,9 @@ FROM Tweets GROUP BY Id EMIT CHANGES;
```

# v0.4.0-preview (work in progress)
```
Install-Package Kafka.DotNet.ksqlDB -Version 0.4.0-rc.1
```
### Maps (v0.4.0)
```C#
var dictionary = new Dictionary<string, int>()
Expand Down Expand Up @@ -836,6 +840,18 @@ Generated KSQL:
DATETOSTRING(18672, 'yyyy-MM-dd')
```

#### TIMESTAMPTOSTRING (v0.4.0)
```C#
new KSqlDBContext(ksqlDbUrl).CreateQueryStream<Movie>()
.Select(c => K.Functions.TimeStampToString(c.RowTime, "yyyy-MM-dd''T''HH:mm:ssX"))
```

Generated KSQL:
```
SELECT DATETOSTRING(1613503749145, 'yyyy-MM-dd''T''HH:mm:ssX')
FROM tweets EMIT CHANGES;
```

**TODO:**
- map type
- missing [aggregation functions](https://docs.ksqldb.io/en/latest/developer-guide/ksqldb-reference/aggregate-functions/) and [scalar functions](https://docs.ksqldb.io/en/latest/developer-guide/ksqldb-reference/scalar-functions/)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -326,6 +326,52 @@ public void DateToString_BuildKSql_PrintsFunction()
kSqlFunction.Should().BeEquivalentTo($"DATETOSTRING({epochDays}, '{format}')");
}

[TestMethod]
public void TimeStampToString_BuildKSql_PrintsFunction()
{
//Arrange
long epochMilli = 1613503749145;
string format = "yyyy-MM-dd HH:mm:ss.SSS";
Expression<Func<Tweet, string>> expression = _ => KSqlFunctions.Instance.TimeStampToString(epochMilli, format);

//Act
var kSqlFunction = ClassUnderTest.BuildKSql(expression);

//Assert
kSqlFunction.Should().BeEquivalentTo($"TIMESTAMPTOSTRING({epochMilli}, '{format}')");
}

[TestMethod]
public void TimeStampToStringFormatWithBackTicks_BuildKSql_PrintsFunction()
{
//Arrange
long epochMilli = 1613503749145;
string format = "yyyy-MM-dd''T''HH:mm:ssX";
Expression<Func<Tweet, string>> expression = _ => KSqlFunctions.Instance.TimeStampToString(epochMilli, format);

//Act
var kSqlFunction = ClassUnderTest.BuildKSql(expression);

//Assert
kSqlFunction.Should().BeEquivalentTo($"TIMESTAMPTOSTRING({epochMilli}, '{format}')");
}

[TestMethod]
public void TimeStampToStringWithTimeZone_BuildKSql_PrintsFunction()
{
//Arrange
long epochMilli = 1613503749145;
string format = "yyyy-MM-dd''T''HH:mm:ssX";
string timeZone = "Europe/London";
Expression<Func<Tweet, string>> expression = _ => KSqlFunctions.Instance.TimeStampToString(epochMilli, format, timeZone);

//Act
var kSqlFunction = ClassUnderTest.BuildKSql(expression);

//Assert
kSqlFunction.Should().BeEquivalentTo($"TIMESTAMPTOSTRING({epochMilli}, '{format}', '{timeZone}')");
}

#endregion

#region Dynamic
Expand Down

0 comments on commit 11ae6d3

Please sign in to comment.