Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Unable to publish large messages (around 10000 bytes) using LibUvHandler #451

Closed
dheeraj12190 opened this issue Feb 16, 2022 · 4 comments
Closed

Comments

@dheeraj12190
Copy link

I am using this library for one of my project, it works fine with small message size, but when the message size go around 10000 Bytes, Connection to the RabbitMQ closes. and message do not publish. I am using LibUvHandler.

@EmielBruijntjes
Copy link
Member

Can you post the smallest possible example program that demonstrates this issue? Without using threads please.

@dheeraj12190
Copy link
Author

dheeraj12190 commented Feb 17, 2022

Hi thank you for your reply, please find the below code generating a large message and publishing it large number of times

#include <iostream>
#include <uv.h>
#include <string>
#include <amqpcpp.h>
#include <amqpcpp/libuv.h>
#include <thread>
#include <mutex>
#include <iomanip>
#include <unistd.h>
using namespace std;
using namespace AMQP;

class RabbitMQPublisher
{
	protected:
		LibUvHandler *ConnectionHandler;
		TcpConnection *Connection;
		TcpChannel *Channel;

		string exchangeName = "first-publish-exchange";
		string queueName = "first-publish-queue";
		string routingKey = "first-publish-exchange";
	
		mutex ChannelLock;
				
	public:
		RabbitMQPublisher(uv_loop_t *Loop);
		bool Publish(string message);
		~RabbitMQPublisher (){}
};
RabbitMQPublisher::RabbitMQPublisher(uv_loop_t *Loop)
{
	ConnectionHandler = new LibUvHandler(Loop);
	Connection = new TcpConnection(ConnectionHandler, Address("amqp://windows:[email protected]/"));
	Channel = new TcpChannel(Connection);
	
	Channel->declareQueue(queueName);
	Channel->declareExchange(exchangeName, AMQP::direct).onSuccess
	([]{ 
		cout << endl << "Exchange Declared" << endl;
	});
	Channel->bindQueue(exchangeName, queueName, routingKey);
}
bool RabbitMQPublisher :: Publish(string message)
{
	ChannelLock.lock();
	Channel->publish(exchangeName, routingKey, message, message.size());
	ChannelLock.unlock();	
	return true;
}
void RunUvLoop(uv_loop_t *Loop)
{
	uv_run(Loop, UV_RUN_DEFAULT);
}
int main()
{
	uv_loop_t *Loop = uv_default_loop();
	RabbitMQPublisher Publisher(Loop);
	thread t(RunUvLoop, Loop);
	usleep(5000000);
	stringstream ss;
	for (int i=0;i<10000;i++)
		ss << hex << setw(2) << setfill('0') << i;
	string message = ss.str();
	for (int i=0;i<500000;i++)
	{
		if (i % 5000 == 0)
			usleep(1000 * 1000);
		//Publisher.Publish("Hi How Are You!!!");
		Publisher.Publish(message);
	}
	
	cout << "Press Any Key To Stop" << endl;
	getchar();
}

The only thread i have used to run the loop

@EmielBruijntjes
Copy link
Member

EmielBruijntjes commented Feb 17, 2022

Please post an example without using threads. Your issue is most likely used because you access the amqp connection from two threads.

@dheeraj12190
Copy link
Author

dheeraj12190 commented Feb 18, 2022

Now I have removed the thread but still the same behavior exist

#include <iostream>
#include <uv.h>
#include <string>
#include <amqpcpp.h>
#include <amqpcpp/libuv.h>
#include <thread>
#include <mutex>
#include <iomanip>
#include <unistd.h>
using namespace std;
using namespace AMQP;

class RabbitMQPublisher
{
	protected:
		LibUvHandler *ConnectionHandler;
		TcpConnection *Connection;
		TcpChannel *Channel;

		string exchangeName = "first-publish-exchange";
		string queueName = "first-publish-queue";
		string routingKey = "first-publish-exchange";
	
		mutex ChannelLock;
				
	public:
		RabbitMQPublisher(uv_loop_t *Loop);
		virtual bool Publish(string message);
		virtual ~RabbitMQPublisher (){}
};
RabbitMQPublisher::RabbitMQPublisher(uv_loop_t *Loop)
{
	ConnectionHandler = new LibUvHandler(Loop);
	Connection = new TcpConnection(ConnectionHandler, Address("amqp://windows:[email protected]/"));
	Channel = new TcpChannel(Connection);
	
	Channel->declareQueue(queueName);
	Channel->declareExchange(exchangeName, AMQP::direct).onSuccess
	([]{ 
		cout << endl << "Exchange Declared" << endl;
			});
	Channel->bindQueue(exchangeName, queueName, routingKey);
}
bool RabbitMQPublisher :: Publish(string message)
{
	ChannelLock.lock();
	Channel->publish(exchangeName, routingKey, message, message.size());
	ChannelLock.unlock();	
	return true;
}
int main()
{
	uv_loop_t *Loop = uv_default_loop();
	RabbitMQPublisher Publisher(Loop);
	stringstream ss;
	for (int i=0;i<5000;i++)
		ss << hex << setw(2) << setfill('0') << i;
	string message = ss.str();
	for (int i=0;i<5;i++)
	{
		//Publisher.Publish("Hi How Are You!!!");   // If I Send this message then every thing works fine
		Publisher.Publish(message); // But in this case connection to the queue breaks
	}
	uv_run(Loop, UV_RUN_DEFAULT);
}

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants