Skip to content

Commit

Permalink
[Kafka.DotNet.ksqlDB] - samples - movies provider
Browse files Browse the repository at this point in the history
  • Loading branch information
tomasfabian committed Feb 17, 2021
1 parent 606ef39 commit 2910a06
Showing 1 changed file with 21 additions and 7 deletions.
28 changes: 21 additions & 7 deletions Samples/Kafka/Kafka.DotNet.ksqlDB.Sample/Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@
using System.Reactive.Linq;
using System.Threading;
using System.Threading.Tasks;
using Kafka.DotNet.ksqlDB.KSql.RestApi;
using Kafka.DotNet.ksqlDB.Sample.Providers;
using K = Kafka.DotNet.ksqlDB.KSql.Query.Functions.KSql;

namespace Kafka.DotNet.ksqlDB.Sample
Expand All @@ -23,24 +25,36 @@ public static class Program
public static async Task Main(string[] args)
{
var ksqlDbUrl = @"http:\\localhost:8088";

var httpClientFactory = new HttpClientFactory(new Uri(ksqlDbUrl));
var restApiProvider = new KSqlDbRestApiProvider(httpClientFactory);
var moviesProvider = new MoviesProvider(restApiProvider);

await moviesProvider.CreateTablesAsync();

var contextOptions = new KSqlDBContextOptions(ksqlDbUrl);

await using var context = new KSqlDBContext(contextOptions);

using var disposable = context.CreateQueryStream<Tweet>()
.Where(p => p.Message != "Hello world" || p.Id == 1)
.Where(c => K.Functions.Like(c.Message.ToLower(), "%ALL%".ToLower()))
using var disposable = context.CreateQueryStream<Movie>()
.Where(p => p.Title != "E.T." || p.Id == 1)
.Where(c => K.Functions.Like(c.Title.ToLower(), "%ALL%".ToLower()))
.Where(p => p.RowTime >= 1510923225000) //AND RowTime >= 1510923225000
.Select(l => new { l.Id, l.Message, l.RowTime })
.Select(l => new { l.Id, l.Title, l.RowTime })
.Take(2) // LIMIT 2
.ToObservable() // client side processing starts here lazily after subscription
.ObserveOn(TaskPoolScheduler.Default)
.Subscribe(onNext: tweetMessage =>
.Subscribe(onNext: movie =>
{
Console.WriteLine($"{nameof(Tweet)}: {tweetMessage.Id} - {tweetMessage.Message}");
Console.WriteLine($"{nameof(Movie)}: {movie.Id} - {movie.Title}");
Console.WriteLine();
}, onError: error => { Console.WriteLine($"Exception: {error.Message}"); }, onCompleted: () => Console.WriteLine("Completed"));


await moviesProvider.InsertMovieAsync(MoviesProvider.Movie1);
await moviesProvider.InsertMovieAsync(MoviesProvider.Movie2);
await moviesProvider.InsertLeadAsync(MoviesProvider.LeadActor1);

Console.WriteLine("Press any key to stop the subscription");

Console.ReadKey();
Expand All @@ -54,7 +68,7 @@ private static IDisposable ClientSideBatching(KSqlDBContext context)
.ToObservable()
.Buffer(TimeSpan.FromMilliseconds(250), 100)
.Where(c => c.Count > 0)
//.ObserveOn(System.Reactive.Concurrency.DispatcherScheduler.Current)
//.ObserveOn(System.Reactive.Concurrency.DispatcherScheduler.Current) //WPF
.Subscribe(tweets =>
{
foreach (var tweet in tweets)
Expand Down

0 comments on commit 2910a06

Please sign in to comment.