-
Notifications
You must be signed in to change notification settings - Fork 876
/
Copy pathProgram.cs
139 lines (125 loc) · 5.81 KB
/
Program.cs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
// Copyright 2018 Confluent Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
// Refer to LICENSE for more information.
using System;
using System.Threading;
using System.Threading.Tasks;
using Confluent.Kafka.SyncOverAsync;
using Confluent.SchemaRegistry;
using Confluent.SchemaRegistry.Serdes;
namespace Confluent.Kafka.Examples.AvroSpecific
{
class Program
{
static void Main(string[] args)
{
if (args.Length != 3)
{
Console.WriteLine("Usage: .. bootstrapServers schemaRegistryUrl topicName");
return;
}
string bootstrapServers = args[0];
string schemaRegistryUrl = args[1];
string topicName = args[2];
var producerConfig = new ProducerConfig
{
BootstrapServers = bootstrapServers
};
var schemaRegistryConfig = new SchemaRegistryConfig
{
// Note: you can specify more than one schema registry url using the
// schema.registry.url property for redundancy (comma separated list).
// The property name is not plural to follow the convention set by
// the Java implementation.
Url = schemaRegistryUrl
};
var consumerConfig = new ConsumerConfig
{
BootstrapServers = bootstrapServers,
GroupId = "avro-specific-example-group"
};
var avroSerializerConfig = new AvroSerializerConfig
{
// optional Avro serializer properties:
BufferBytes = 100
};
CancellationTokenSource cts = new CancellationTokenSource();
var consumeTask = Task.Run(() =>
{
using (var schemaRegistry = new CachedSchemaRegistryClient(schemaRegistryConfig))
using (var consumer =
new ConsumerBuilder<string, User>(consumerConfig)
.SetValueDeserializer(new AvroDeserializer<User>(schemaRegistry).AsSyncOverAsync())
.SetErrorHandler((_, e) => Console.WriteLine($"Error: {e.Reason}"))
.Build())
{
consumer.Subscribe(topicName);
try
{
while (true)
{
try
{
var consumeResult = consumer.Consume(cts.Token);
var user = consumeResult.Message.Value;
Console.WriteLine($"key: {consumeResult.Message.Key}, user name: {user.name}, favorite number: {user.favorite_number}, favorite color: {user.favorite_color}, hourly_rate: {user.hourly_rate}");
}
catch (ConsumeException e)
{
Console.WriteLine($"Consume error: {e.Error.Reason}");
}
}
}
catch (OperationCanceledException)
{
consumer.Close();
}
}
});
using (var schemaRegistry = new CachedSchemaRegistryClient(schemaRegistryConfig))
using (var producer =
new ProducerBuilder<string, User>(producerConfig)
.SetValueSerializer(new AvroSerializer<User>(schemaRegistry, avroSerializerConfig))
.Build())
{
Console.WriteLine($"{producer.Name} producing on {topicName}. Enter user names, q to exit.");
int i = 1;
string text;
while ((text = Console.ReadLine()) != "q")
{
User user = new User { name = text, favorite_color = "green", favorite_number = ++i, hourly_rate = new Avro.AvroDecimal(67.99) };
producer
.ProduceAsync(topicName, new Message<string, User> { Key = text, Value = user })
.ContinueWith(task =>
{
if (!task.IsFaulted)
{
Console.WriteLine($"produced to: {task.Result.TopicPartitionOffset}");
return;
}
// Task.Exception is of type AggregateException. Use the InnerException property
// to get the underlying ProduceException. In some cases (notably Schema Registry
// connectivity issues), the InnerException of the ProduceException will contain
// additional information pertaining to the root cause of the problem. Note: this
// information is automatically included in the output of the ToString() method of
// the ProduceException which is called implicitly in the below.
Console.WriteLine($"error producing message: {task.Exception.InnerException}");
});
}
}
cts.Cancel();
}
}
}