Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

LastConsumed behaviuor seems a little broken #321

Closed
HustonMmmavr opened this issue Jun 21, 2024 · 3 comments
Closed

LastConsumed behaviuor seems a little broken #321

HustonMmmavr opened this issue Jun 21, 2024 · 3 comments
Labels
enhancement New feature or request

Comments

@HustonMmmavr
Copy link
Contributor

HustonMmmavr commented Jun 21, 2024

Is your feature request related to a problem? Please describe.

Hello!

I've found that LastConsumed offset would replay the last message at the stream.
Here is a repro:

package main

import (
	"context"
	"fmt"
	"os"
	"os/signal"
	"strconv"
	"time"

	"github.com/google/uuid"
	"github.com/rabbitmq/rabbitmq-stream-go-client/pkg/amqp"
	"github.com/rabbitmq/rabbitmq-stream-go-client/pkg/stream"
)

func CheckErr(err error) {
	if err != nil {
		fmt.Printf("%s ", err)
		os.Exit(1)
	}
}

func main() {
	ctx, stop := signal.NotifyContext(context.Background(), os.Interrupt, os.Kill)
	defer stop()
	env, err := stream.NewEnvironment(
		stream.NewEnvironmentOptions().
			SetHost("localhost").
			SetPort(5552).
			SetUser("guest").
			SetPassword("guest").SetRequestedHeartbeat(time.Second * 10))
	CheckErr(err)

	streamName := uuid.New().String()
	err = env.DeclareStream(streamName,
		&stream.StreamOptions{
			MaxLengthBytes: stream.ByteCapacity{}.MB(500),
		},
	)
	CheckErr(err)

	consName := "consumer_"
	producer, err := env.NewProducer(streamName, nil)
	CheckErr(err)

	go func() {
		for i := 0; i < 1; i++ {
			err := producer.Send(amqp.NewMessage([]byte("hello_world_" + strconv.Itoa(i))))
			CheckErr(err)
		}
	}()

	msgChan := make(chan string)

	handleMessages := func(consumerContext stream.ConsumerContext, message *amqp.Message) {
		err := consumerContext.Consumer.StoreCustomOffset(consumerContext.Consumer.GetOffset())
		if err != nil {
			CheckErr(err)
		}
		fmt.Printf("offset %d\n", consumerContext.Consumer.GetOffset())
		msgChan <- string(message.GetData())
	}

	cons, err := env.NewConsumer(streamName, handleMessages, stream.
		NewConsumerOptions().
		SetManualCommit().
		SetOffset(stream.OffsetSpecification{}.LastConsumed()).SetConsumerName(consName),
	)
	CheckErr(err)
	fmt.Println(<-msgChan)
	cons.Close()

	consCopy, err := env.NewConsumer(streamName, handleMessages, stream.
		NewConsumerOptions().
		SetManualCommit().
		SetOffset(stream.OffsetSpecification{}.LastConsumed()).SetConsumerName(consName),
	)
	CheckErr(err)
	fmt.Println(<-msgChan)
	defer consCopy.Close()

	<-ctx.Done()
}

The output is next:

offset 0
hello_world_0
offset 0
hello_world_0

Is it an expected behaviour?

Describe the solution you'd like

In my code i have to use Offset specification instead of LastConsumed, and inside the callback check if consumer context offset equals QueryOffset at the start of consumer, than this message was processed and it should be skipped

But is it correct to make someting like this: query offset before starting of consumer, and if offset exists increment it:

offset, err := env.QueryOffset(consName, streamName) 
//handle error
if offsetExists {
	offset++ // is this increment correct
}
consCopy, err := env.NewConsumer(streamName, handleMessages, stream.
	NewConsumerOptions().
	SetManualCommit().
	SetOffset(stream.OffsetSpecification{}.Offset(offset)).SetConsumerName(consName),
)

If this solution is correct, may be it's worth to setup lastOffset + 1 here

Describe alternatives you've considered

No response

Additional context

No response

@HustonMmmavr HustonMmmavr added the enhancement New feature or request label Jun 21, 2024
@Gsantomaggio
Copy link
Member

Hi @HustonMmmavr,
LastConsumed API was meant to help the user to restart from the last stored message.
Over the years, I have seen that it created more confusion than helping.
Also the name is misleading.

This is why I decided to deprecate the API, The correct way is what you have done:

offset, err := env.QueryOffset(consName, streamName) 
//handle error
if offsetExists {
	offset++ // is this increment correct
}

@HustonMmmavr
Copy link
Contributor Author

Got it! Thank you)

@Gsantomaggio
Copy link
Member

The method is deprecated now #324.

Thank you @HustonMmmavr

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement New feature or request
Projects
None yet
Development

No branches or pull requests

2 participants