Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Check version when serverProperties is not empty and < 3.11.0 #345

Merged
merged 3 commits into from
Jul 30, 2024

Conversation

Gsantomaggio
Copy link
Member

@Gsantomaggio Gsantomaggio commented Jul 29, 2024

Fixes: #344

When the serverProperties["version"] is not empty the client now checks the version.
If the version is >=3.11 it is possible to enable the exchangeVersion function.

Fixes another small bug when the single active consumer is enabled

cc @hiimjako

and is less than 3.11

Signed-off-by: Gabriele Santomaggio <[email protected]>
and is less than 3.11

Signed-off-by: Gabriele Santomaggio <[email protected]>
@Gsantomaggio Gsantomaggio marked this pull request as ready for review July 29, 2024 11:51
@Gsantomaggio
Copy link
Member Author

@LucasZhanye can you please try this PR?

@LucasZhanye
Copy link

@LucasZhanye can you please try this PR?

Yes, I can

I try it later, and I will reply it after I finished

@LucasZhanye
Copy link

@Gsantomaggio I try it, it is work well!
the output log is : 2024/07/30 10:49:39 [info] - Server version is less than 3.11.0, skipping command version exchange.

And I want to know how long it will release a new version for it ?

@LucasZhanye
Copy link

@LucasZhanye can you please try this PR?

oh, I find another problem.

Now I can DeclareStream,NewProducer,but producer.Send is not take effect.

my code is :

const (
	EVENTSTREAM = "events"
)

type Event struct {
	Name string
}

func main() {
	// COnnect to the Stream Plugin on Rabbimq
	env, err := stream.NewEnvironment(
		stream.NewEnvironmentOptions().
			SetHost("10.2.3.200").
			SetPort(15552).
			SetUser("guest").
			SetPassword("guest"))
	if err != nil {
		panic(err)
	}
	// Declare the stream, Set segmentsize and Maxbytes on stream
	err = env.DeclareStream(EVENTSTREAM, stream.NewStreamOptions().
		SetMaxSegmentSizeBytes(stream.ByteCapacity{}.MB(1)).
		SetMaxLengthBytes(stream.ByteCapacity{}.MB(2)))

	if err != nil {
		panic(err)
	}

	// Create a new Producer
	producerOptions := stream.NewProducerOptions()
	producerOptions.SetProducerName("producer")

	// Batch 100 Events in the same Frame, and the SDK will handle everything
	// DEDPULICATION DOES NOT WORK WITH SUBENTRY
	producerOptions.SetSubEntrySize(100)
	producerOptions.SetCompression(stream.Compression{}.Gzip())

	producer, err := env.NewProducer(EVENTSTREAM, producerOptions)
	if err != nil {
		panic(err)
	}

	// Publish 6001 messages
	for i := 0; i <= 6001; i++ {
		// fmt.Println("i = ", i)
		event := Event{
			Name: "test",
		}

		data, err := json.Marshal(event)
		if err != nil {
			panic(err)
		}

		message := amqp.NewMessage(data)
		// Apply properties to our message
		props := &amqp.MessageProperties{
			CorrelationID: uuid.NewString(),
		}
		message.Properties = props
		// Set Publishing ID to prevent Deduplication
		message.SetPublishingId(int64(i))
		// Sending the message
		if err := producer.Send(message); err != nil {
			panic(err)
		}
	}

	producer.Close()
}

But the rabbitmq management show as below:
image

the queue message is zero,but I send 6002

@Gsantomaggio
Copy link
Member Author

@LucasZhanye, I cannot reproduce the issue.

Screenshot 2024-07-30 at 09 04 02

You should check the server logs and also try to remove these values:

SetMaxSegmentSizeBytes(stream.ByteCapacity{}.MB(1)).
SetMaxLengthBytes(stream.ByteCapacity{}.MB(2)))

also you should always enable the confirmation and wait before close the producer since the send is async

	producerOptions := stream.NewProducerOptions()
	producerOptions.SetProducerName("producer")

	// Batch 100 Events in the same Frame, and the SDK will handle everything
	// DEDPULICATION DOES NOT WORK WITH SUBENTRY
	producerOptions.SetSubEntrySize(100)
	producerOptions.SetCompression(stream.Compression{}.Gzip())

	producer, err := env.NewProducer(EVENTSTREAM, producerOptions)
	if err != nil {
		panic(err)
	}
	chPublishConfirm := producer.NotifyPublishConfirmation()
	totalConfirmed := int32(0)
	chFinished := make(chan struct{})
	go func() {
		for confirmed := range chPublishConfirm {
			for _, msg := range confirmed {
				if msg.IsConfirmed() {
					atomic.AddInt32(&totalConfirmed, 1)
					if atomic.LoadInt32(&totalConfirmed) == 6001 {
						chFinished <- struct{}{}
					}
				} else {
					fmt.Printf("message %s failed \n  ", msg.GetMessage().GetData())
				}

			}
		}
	}()

	// Publish 6001 messages
	for i := 0; i <= 6001; i++ {
		// fmt.Println("i = ", i)
		event := Event{
			Name: "test",
		}

		data, err := json.Marshal(event)
		if err != nil {
			panic(err)
		}

		message := amqp.NewMessage(data)
		// Apply properties to our message
		props := &amqp.MessageProperties{
			CorrelationID: uuid.NewString(),
		}
		message.Properties = props
		// Set Publishing ID to prevent Deduplication
		message.SetPublishingId(int64(i))
		// Sending the message
		if err := producer.Send(message); err != nil {
			panic(err)
		}
	}

	<-chFinished

	producer.Close()
	env.Close()
	fmt.Printf("Closed with total Confirmed: %d\n", totalConfirmed)

@LucasZhanye
Copy link

@LucasZhanye, I cannot reproduce the issue.

Screenshot 2024-07-30 at 09 04 02 You should check the server logs and also try to remove these values:
SetMaxSegmentSizeBytes(stream.ByteCapacity{}.MB(1)).
SetMaxLengthBytes(stream.ByteCapacity{}.MB(2)))

also you should always enable the confirmation and wait before close the producer since the send is async

	producerOptions := stream.NewProducerOptions()
	producerOptions.SetProducerName("producer")

	// Batch 100 Events in the same Frame, and the SDK will handle everything
	// DEDPULICATION DOES NOT WORK WITH SUBENTRY
	producerOptions.SetSubEntrySize(100)
	producerOptions.SetCompression(stream.Compression{}.Gzip())

	producer, err := env.NewProducer(EVENTSTREAM, producerOptions)
	if err != nil {
		panic(err)
	}
	chPublishConfirm := producer.NotifyPublishConfirmation()
	totalConfirmed := int32(0)
	chFinished := make(chan struct{})
	go func() {
		for confirmed := range chPublishConfirm {
			for _, msg := range confirmed {
				if msg.IsConfirmed() {
					atomic.AddInt32(&totalConfirmed, 1)
					if atomic.LoadInt32(&totalConfirmed) == 6001 {
						chFinished <- struct{}{}
					}
				} else {
					fmt.Printf("message %s failed \n  ", msg.GetMessage().GetData())
				}

			}
		}
	}()

	// Publish 6001 messages
	for i := 0; i <= 6001; i++ {
		// fmt.Println("i = ", i)
		event := Event{
			Name: "test",
		}

		data, err := json.Marshal(event)
		if err != nil {
			panic(err)
		}

		message := amqp.NewMessage(data)
		// Apply properties to our message
		props := &amqp.MessageProperties{
			CorrelationID: uuid.NewString(),
		}
		message.Properties = props
		// Set Publishing ID to prevent Deduplication
		message.SetPublishingId(int64(i))
		// Sending the message
		if err := producer.Send(message); err != nil {
			panic(err)
		}
	}

	<-chFinished

	producer.Close()
	env.Close()
	fmt.Printf("Closed with total Confirmed: %d\n", totalConfirmed)

Thanks so much!

Now I run main.go on local system, but the rabbitmq server is on remote, I have a problem when NewProducer, the log is: "panic: Authentication Failure"

But I run main.go on the remote system which the rabbitmq server on , it runs ok.

@LucasZhanye
Copy link

The debug log is:

2024/07/30 15:48:10 [info] - Server version is less than 3.11.0, skipping command version exchange
2024/07/30 15:48:10 [debug] - User root, connected to: 10.2.3.200:5552, vhost:test
2024/07/30 15:48:10 [debug] - Read connection failed: read tcp 172.24.127.40:36958->10.2.3.200:5552: use of closed network connection
2024/07/30 15:48:10 [info] - Server version is less than 3.11.0, skipping command version exchange
2024/07/30 15:48:10 [debug] - User root, connected to: 10.2.3.200:5552, vhost:test
2024/07/30 15:48:10 [debug] - Read connection failed: read tcp 172.24.127.40:36960->10.2.3.200:5552: use of closed network connection
2024/07/30 15:48:10 [info] - Server version is less than 3.11.0, skipping command version exchange
2024/07/30 15:48:10 [debug] - User root, connected to: 10.2.3.200:5552, vhost:test
2024/07/30 15:48:10 [debug] - Read connection failed: EOF
2024/07/30 15:48:10 [debug] - error removing heartbeat: Response #{id} not found
2024/07/30 15:48:10 [debug] - User:root, Authentication Failure
2024/07/30 15:48:10 [debug] - Read connection failed: read tcp 172.24.127.40:36964->10.2.3.200:5552: use of closed network connection

@Gsantomaggio
Copy link
Member Author

2024/07/30 15:48:10 [debug] - User:root, Authentication Failure

I think it is clear where is the problem here

Signed-off-by: Gabriele Santomaggio <[email protected]>
@hiimjako
Copy link
Collaborator

In my opinion, the problem of connecting to pre-3.11 servers is solved, as confirmed above. If there are no other technical problems, I think it is mergeable

@hiimjako hiimjako added the bug Something isn't working label Jul 30, 2024
@Gsantomaggio Gsantomaggio merged commit e0bcf7e into main Jul 30, 2024
4 checks passed
@Gsantomaggio Gsantomaggio deleted the check_version branch July 30, 2024 08:29
@LucasZhanye
Copy link

LucasZhanye commented Jul 30, 2024

Authentication Failure

Hi @Gsantomaggio Maybe I know why it is the queue is zero.

Because I have two rabbitmq server run by docker container, the first is listen on 5552, and the second is listen on 15552

When I exec main.go which is connect to the port 15552, it will send msg to the rabbitmq server which is listen on 5552.

From the log, at last it use 5552:

2024/07/30 16:22:50 [info] - Server version is less than 3.11.0, skipping command version exchange
2024/07/30 16:22:50 [debug] - User root, connected to: 10.2.3.200:15552, vhost:test
2024/07/30 16:22:50 [debug] - Read connection failed: read tcp 10.2.3.200:35938->10.2.3.200:15552: use of closed network connection
2024/07/30 16:22:50 [info] - Server version is less than 3.11.0, skipping command version exchange
2024/07/30 16:22:50 [debug] - **User root, connected to: 10.2.3.200:15552**, vhost:test
2024/07/30 16:22:50 [debug] - Read connection failed: read tcp 10.2.3.200:35940->10.2.3.200:15552: use of closed network connection
2024/07/30 16:22:50 [info] - Server version is less than 3.11.0, skipping command version exchange
2024/07/30 16:22:50 [debug] - User root, connected to: 10.2.3.200:15552, vhost:test
coordinatorKey =  **localhost:5552**
2024/07/30 16:22:50 [info] - Server version is less than 3.11.0, skipping command version exchange
2024/07/30 16:22:50 [debug] - User root, connected to: localhost:5552, vhost:test
2024/07/30 16:22:50 [debug] - Read connection failed: read tcp 10.2.3.200:35942->10.2.3.200:15552: use of closed network connection
2024/07/30 16:22:50 [debug] - waitForInflightMessages, channel: 2374 - pending messages len: 28 - unconfirmed len: 2502 - retry: 0
2024/07/30 16:22:50 [debug] - **Read connection failed: read tcp 127.0.0.1:45690->127.0.0.1:5552**: use of closed network connection
Closed with total Confirmed: 6001

Finally, I see the rabbitmq management which is listen on 5552 , the queue message is increased.
image

@Gsantomaggio
Copy link
Member Author

@hiimjako @LucasZhanye merged.

@LucasZhanye
Copy link

Authentication Failure

Hi @Gsantomaggio Maybe I know why it is the queue is zero.

Because I have two rabbitmq server run by docker container, the first is listen on 5552, and the second is listen on 15552

When I exec main.go which is connect to the port 15552, it will send msg to the rabbitmq server which is listen on 5552.

From the log, at last it use 5552:

2024/07/30 16:22:50 [info] - Server version is less than 3.11.0, skipping command version exchange
2024/07/30 16:22:50 [debug] - User root, connected to: 10.2.3.200:15552, vhost:test
2024/07/30 16:22:50 [debug] - Read connection failed: read tcp 10.2.3.200:35938->10.2.3.200:15552: use of closed network connection
2024/07/30 16:22:50 [info] - Server version is less than 3.11.0, skipping command version exchange
2024/07/30 16:22:50 [debug] - **User root, connected to: 10.2.3.200:15552**, vhost:test
2024/07/30 16:22:50 [debug] - Read connection failed: read tcp 10.2.3.200:35940->10.2.3.200:15552: use of closed network connection
2024/07/30 16:22:50 [info] - Server version is less than 3.11.0, skipping command version exchange
2024/07/30 16:22:50 [debug] - User root, connected to: 10.2.3.200:15552, vhost:test
coordinatorKey =  **localhost:5552**
2024/07/30 16:22:50 [info] - Server version is less than 3.11.0, skipping command version exchange
2024/07/30 16:22:50 [debug] - User root, connected to: localhost:5552, vhost:test
2024/07/30 16:22:50 [debug] - Read connection failed: read tcp 10.2.3.200:35942->10.2.3.200:15552: use of closed network connection
2024/07/30 16:22:50 [debug] - waitForInflightMessages, channel: 2374 - pending messages len: 28 - unconfirmed len: 2502 - retry: 0
2024/07/30 16:22:50 [debug] - **Read connection failed: read tcp 127.0.0.1:45690->127.0.0.1:5552**: use of closed network connection
Closed with total Confirmed: 6001

Finally, I see the rabbitmq management which is listen on 5552 , the queue message is increased. image

I find the solution on #315

I change my code to below:

addressResolver := stream.AddressResolver{
		Host: "10.2.3.200",
		Port: 15552,
	}

	env, err := stream.NewEnvironment(
		stream.NewEnvironmentOptions().
			SetHost(addressResolver.Host).
			SetPort(addressResolver.Port).
			SetAddressResolver(addressResolver).
			SetUser("root").SetPassword("root").SetVHost("test").
			SetMaxProducersPerClient(5))

It works ok!

@Gsantomaggio Gsantomaggio changed the title Check version when version is not Check version when serverProperties is not empty and < 3.11.0 Jul 30, 2024
@Gsantomaggio
Copy link
Member Author

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Can't not connect to Rabbitmq Stream via port 5552
3 participants