Skip to content

Commit

Permalink
[Kafka.DotNet.ksqlDB] - samples - movies provider
Browse files Browse the repository at this point in the history
  • Loading branch information
tomasfabian committed Feb 17, 2021
1 parent ed8030d commit 606ef39
Show file tree
Hide file tree
Showing 3 changed files with 192 additions and 0 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
using System.Threading.Tasks;

namespace Kafka.DotNet.ksqlDB.Sample.Providers
{
public interface IKSqlDbRestApiProvider
{
static string KSqlDbUrl { get; }
Task<bool> ExecuteStatementAsync(string ksql);
Task<bool> DropStreamAndTopic(string streamName);
Task<bool> DropTableAndTopic(string tableName);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
using System;
using System.Net.Http;
using System.Net.Http.Headers;
using System.Text;
using System.Text.Json;
using System.Threading;
using System.Threading.Tasks;
using Kafka.DotNet.ksqlDB.KSql.RestApi;

namespace Kafka.DotNet.ksqlDB.Sample.Providers
{
public class KSqlDbRestApiProvider : IKSqlDbRestApiProvider
{
private readonly HttpClientFactory httpClientFactory;

internal class KSqlStatement
{
public string ksql { get; set; }
}

public static string KsqlDbUrl { get; } = @"http:\\localhost:8088";

public static KSqlDbRestApiProvider Create(string ksqlDbUrl = null)
{
var uri = new Uri(ksqlDbUrl ?? KsqlDbUrl);

return new KSqlDbRestApiProvider(new HttpClientFactory(uri));
}

public KSqlDbRestApiProvider(HttpClientFactory httpClientFactory)
{
this.httpClientFactory = httpClientFactory ?? throw new ArgumentNullException(nameof(httpClientFactory));
}

public async Task<bool> ExecuteStatementAsync(string ksql)
{
try
{
using var httpClient = httpClientFactory.CreateClient();

var statement = new KSqlStatement
{
ksql = ksql
};

var json = JsonSerializer.Serialize(statement);

var data = new StringContent(json, Encoding.UTF8, "application/json");

httpClient.DefaultRequestHeaders.Accept.Add(
new MediaTypeWithQualityHeaderValue("application/vnd.ksql.v1+json"));

var httpRequestMessage = new HttpRequestMessage(HttpMethod.Post, "/ksql")
{
Content = data
};

var cancellationToken = new CancellationToken();

var httpResponseMessage = await httpClient.SendAsync(httpRequestMessage,
HttpCompletionOption.ResponseHeadersRead,
cancellationToken);
}
catch (Exception e)
{
Console.WriteLine(e);
throw;
}

return true;
}

public Task<bool> DropStreamAndTopic(string streamName)
{
var statement = $"DROP STREAM IF EXISTS {streamName} DELETE TOPIC;";

return ExecuteStatementAsync(statement);
}

public Task<bool> DropTableAndTopic(string tableName)
{
var statement = $"DROP TABLE IF EXISTS {tableName} DELETE TOPIC;";

return ExecuteStatementAsync(statement);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
using System;
using System.Threading.Tasks;
using Kafka.DotNet.ksqlDB.Sample.Models.Movies;

namespace Kafka.DotNet.ksqlDB.Sample.Providers
{
public class MoviesProvider
{
private readonly IKSqlDbRestApiProvider restApiProvider;

public MoviesProvider(IKSqlDbRestApiProvider restApiProvider)
{
this.restApiProvider = restApiProvider ?? throw new ArgumentNullException(nameof(restApiProvider));
}

public static readonly string MoviesTableName = "movies";
public static readonly string ActorsTableName = "lead_actor";

public async Task<bool> CreateTablesAsync()
{
var createMoviesTable = $@"CREATE TABLE {MoviesTableName} (
title VARCHAR PRIMARY KEY,
id INT,
release_year INT
) WITH (
KAFKA_TOPIC='{MoviesTableName}',
PARTITIONS=1,
VALUE_FORMAT = 'JSON'
);";

var result = await restApiProvider.ExecuteStatementAsync(createMoviesTable);

var createActorsTable = $@"CREATE TABLE {ActorsTableName} (
title VARCHAR PRIMARY KEY,
actor_name VARCHAR
) WITH (
KAFKA_TOPIC='{ActorsTableName}',
PARTITIONS=1,
VALUE_FORMAT='JSON'
);";

result = await restApiProvider.ExecuteStatementAsync(createActorsTable);

return true;
}

public static readonly Movie Movie1 = new Movie()
{
Id = 1,
Release_Year = 1986,
Title = "Aliens"
};

public static readonly Movie Movie2 = new Movie()
{
Id = 2,
Release_Year = 1998,
Title = "Die Hard"
};

public static readonly Lead_Actor LeadActor1 = new Lead_Actor()
{
Actor_Name = "Sigourney Weaver",
Title = "Aliens"
};

public async Task<bool> InsertMovieAsync(Movie movie)
{
string insert =
$"INSERT INTO {MoviesTableName} ({nameof(Movie.Id)}, {nameof(Movie.Title)}, {nameof(Movie.Release_Year)}) VALUES ({movie.Id}, '{movie.Title}', {movie.Release_Year});";

var result = await restApiProvider.ExecuteStatementAsync(insert);

return result;
}

public async Task<bool> InsertLeadAsync(Lead_Actor actor)
{
string insert =
$"INSERT INTO {ActorsTableName} ({nameof(Lead_Actor.Title)}, {nameof(Lead_Actor.Actor_Name)}) VALUES ('{actor.Title}', '{actor.Actor_Name}');";

var result = await restApiProvider.ExecuteStatementAsync(insert);

return result;
}

public async Task DropTablesAsync()
{
await restApiProvider.DropTableAndTopic(ActorsTableName);
await restApiProvider.DropTableAndTopic(MoviesTableName);
}
}
}

0 comments on commit 606ef39

Please sign in to comment.