Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

HasNext returns false when message is available #241

Closed
phemmer opened this issue May 8, 2020 · 2 comments
Closed

HasNext returns false when message is available #241

phemmer opened this issue May 8, 2020 · 2 comments

Comments

@phemmer
Copy link

phemmer commented May 8, 2020

Expected behavior

Reader.HasNext() should return true when messages are available.

Actual behavior

Reader.HasNext() returns false when messages are available.

Steps to reproduce

package main

import (
	"context"
	"fmt"
	"os"

	"github.com/apache/pulsar-client-go/pulsar"
)

func main() {
	if err := Main(); err != nil {
		fmt.Fprintf(os.Stderr, "Error: %s\n", err)
		os.Exit(1)
	}
	os.Exit(0)
}

func Main() error {
	client, err := pulsar.NewClient(pulsar.ClientOptions{
		URL: "pulsar://localhost:6650",
	})
	if err != nil { return err }

	producer, err := client.CreateProducer(pulsar.ProducerOptions{
		Topic: "persistent://public/test/test",
	})
	if err != nil { return err }

	id, err := producer.Send(context.Background(), &pulsar.ProducerMessage{Payload: []byte("test")})
	if err != nil { return err }

	consumer, err := client.CreateReader(pulsar.ReaderOptions{
		Topic: "persistent://public/test/test",
		StartMessageID: id,
		StartMessageIDInclusive: true,
	})

	hasnext := consumer.HasNext()
	fmt.Printf("HasNext: %v\n", hasnext)

	next, err := consumer.Next(context.Background())
	if err != nil { return err }
	fmt.Printf("Next: %+v\n", next)

	return nil
}
INFO[0000] Connecting to broker                          remote_addr="pulsar://localhost:6650"
INFO[0000] TCP connection established                    local_addr="127.0.0.1:33626" remote_addr="pulsar://localhost:6650"
INFO[0000] Connection is ready                           local_addr="127.0.0.1:33626" remote_addr="pulsar://localhost:6650"
INFO[0000] Created producer                              cnx="127.0.0.1:33626 -> 127.0.0.1:6650" producer_name=standalone-0-130 topic="persistent://public/test/test"
INFO[0000] Connected consumer                            name= subscription=reader-knfbf topic="persistent://public/test/test"
INFO[0000] Created consumer                              name= subscription=reader-knfbf topic="persistent://public/test/test"
HasNext: false
Next: &{publishTime:{wall:195000000 ext:63724497913 loc:0xc19de0} eventTime:{wall:128448384 ext:55340232221 loc:0xc19de0} key: payLoad:[116 101 115 116] msgID:0xc000172000 properties:map[] topic:persistent://public/test/test replicationClusters:[] redeliveryCount:0}

System configuration

Pulsar version: 2.5.1

@wolfstudy
Copy link
Member

You can try to use the following code example:

	consumer, err := client.CreateReader(pulsar.ReaderOptions{
		Topic: "persistent://public/default/test",
		StartMessageID: pulsar.EarliestMessageID(),
		StartMessageIDInclusive: true,
	})

More use reference to here

@wolfstudy
Copy link
Member

The issue will be fixed in #329

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

3 participants