-
Notifications
You must be signed in to change notification settings - Fork 20
/
Copy pathRabbitMqTests.cs
137 lines (115 loc) · 4.84 KB
/
RabbitMqTests.cs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
using System;
using System.IO;
using System.Text;
using System.Threading;
using NUnit.Framework;
using RabbitMQ.Client;
namespace RabbitMq.Tests
{
[TestFixture]
public class RabbitMqTests
{
private readonly ConnectionFactory rabbitMqFactory = new ConnectionFactory { HostName = "localhost" };
const string ExchangeName = "test.exchange";
const string QueueName = "test.queue";
[Test]
public void Register_durable_Exchange_and_Queue()
{
using (IConnection conn = rabbitMqFactory.CreateConnection())
using (IModel channel = conn.CreateModel())
{
channel.ExchangeDeclare(ExchangeName, "direct", durable: true, autoDelete: false, arguments: null);
channel.QueueDeclare(QueueName, durable: true, exclusive: false, autoDelete: false, arguments: null);
channel.QueueBind(QueueName, ExchangeName, routingKey: QueueName);
}
}
[Test]
public void Publish_persistent_message_to_test_exchange()
{
using (IConnection conn = rabbitMqFactory.CreateConnection())
using (IModel channel = conn.CreateModel())
{
var props = channel.CreateBasicProperties();
props.SetPersistent(true);
var msgBody = Encoding.UTF8.GetBytes("Hello, World!");
channel.BasicPublish(ExchangeName, routingKey: QueueName, basicProperties: props, body: msgBody);
}
}
[Test]
public void Get_message_from_queue()
{
using (IConnection conn = rabbitMqFactory.CreateConnection())
using (IModel channel = conn.CreateModel())
{
BasicGetResult msgResponse = channel.BasicGet(QueueName, noAck: true);
var msgBody = Encoding.UTF8.GetString(msgResponse.Body);
Assert.That(msgBody, Is.EqualTo("Hello, World!"));
}
}
[Test]
public void Consume_one_message_from_Queue_Subscription()
{
using (IConnection conn = rabbitMqFactory.CreateConnection())
using (IModel channel = conn.CreateModel())
{
var consumer = new QueueingBasicConsumer(channel);
channel.BasicConsume(QueueName, noAck: true, consumer: consumer);
var msgResponse = consumer.Queue.Dequeue(); //blocking
var msgBody = Encoding.UTF8.GetString(msgResponse.Body);
Assert.That(msgBody, Is.EqualTo("Hello, World!"));
}
}
[Test]
public void Publish_5_messages_to_test_exchange()
{
using (IConnection conn = rabbitMqFactory.CreateConnection())
using (IModel channel = conn.CreateModel())
{
for (var i = 0; i < 5; i++)
{
var props = channel.CreateBasicProperties();
props.SetPersistent(true);
var msgBody = Encoding.UTF8.GetBytes("Hello, World!");
channel.BasicPublish(ExchangeName, routingKey: QueueName, basicProperties: props, body: msgBody);
}
}
}
[Test]
public void Consume_messages_from_Queue_Subscription()
{
using (IConnection conn = rabbitMqFactory.CreateConnection())
using (IModel channel = conn.CreateModel())
{
var consumer = new QueueingBasicConsumer(channel);
channel.BasicConsume(QueueName, noAck: true, consumer: consumer);
ThreadPool.QueueUserWorkItem(_ => {
var now = DateTime.UtcNow;
while (DateTime.UtcNow - now < TimeSpan.FromSeconds(5))
{
var props = channel.CreateBasicProperties();
props.SetPersistent(true);
var msgBody = Encoding.UTF8.GetBytes("Hello, World!");
channel.BasicPublish(ExchangeName, routingKey: QueueName, basicProperties: props, body: msgBody);
Thread.Sleep(1000);
}
channel.Close();
});
while (true)
{
try
{
var msgResponse = consumer.Queue.Dequeue(); //blocking
var msgBody = Encoding.UTF8.GetString(msgResponse.Body);
Console.WriteLine("Received Message: " + msgBody);
Thread.Sleep(1000);
}
catch (EndOfStreamException ex)
{
Console.WriteLine("Channel was closed, Exiting...");
break;
}
}
}
}
}
}