Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Wait For Inflight Messages #104

Merged
merged 8 commits into from
Nov 9, 2021
Merged

Wait For Inflight Messages #104

merged 8 commits into from
Nov 9, 2021

Conversation

Gsantomaggio
Copy link
Member

Fixes #103

@codecov-commenter
Copy link

Codecov Report

Merging #104 (28d1799) into main (e421380) will increase coverage by 0.02%.
The diff coverage is 100.00%.

❗ Current head 28d1799 differs from pull request most recent head a65d84b. Consider uploading reports for the commit a65d84b to get more accurate results
Impacted file tree graph

@@            Coverage Diff             @@
##             main     #104      +/-   ##
==========================================
+ Coverage   79.28%   79.31%   +0.02%     
==========================================
  Files          16       16              
  Lines        2404     2407       +3     
==========================================
+ Hits         1906     1909       +3     
  Misses        363      363              
  Partials      135      135              
Impacted Files Coverage Δ
pkg/stream/coordinator.go 93.75% <100.00%> (ø)
pkg/stream/producer.go 85.81% <100.00%> (+0.14%) ⬆️

Continue to review full report at Codecov.

Legend - Click here to learn more
Δ = absolute <relative> (impact), ø = not affected, ? = missing data
Powered by Codecov. Last update e421380...a65d84b. Read the comment docs.

@Gsantomaggio Gsantomaggio changed the title WIP don't merge handle close Wait For Inflight Messages Nov 9, 2021
@Gsantomaggio
Copy link
Member Author

Gsantomaggio commented Nov 9, 2021

@brenol would you please test the PR?

you can use:

package main

import (
	"fmt"
	"github.com/rabbitmq/rabbitmq-stream-go-client/pkg/logs"
	"log"
	"sync/atomic"
	"time"

	"github.com/rabbitmq/rabbitmq-stream-go-client/pkg/amqp"
	"github.com/rabbitmq/rabbitmq-stream-go-client/pkg/stream"
)

const streamName = "test-output"

var count int32
var fail int32

func handlePublishConfirm(confirms stream.ChannelPublishConfirm) {
	go func() {
		for confirmed := range confirms {
			for _, msg := range confirmed {
				if msg.IsConfirmed() {
					atomic.AddInt32(&count, 1)

				} else {
					atomic.AddInt32(&fail, 1)
				}

			}
		}
	}()
}

func main() {

	env, err := stream.NewEnvironment(
		stream.NewEnvironmentOptions().SetUri("tcp://guest:guest@localhost:5552"),
	)
	if err != nil {
		panic(err)
	}
	if err := setupStream(env); err != nil {
		panic(err)
	}

	// consumer
	// using producer.Close() no messages will be consumed
	var consumerClose func() error
	go func() {
		opts := stream.NewConsumerOptions().
			SetConsumerName("test")
		cons, err := env.NewConsumer(streamName, consume, opts)
		if err != nil {
			panic(err)
		}
		consumerClose = cons.Close
	}()
	defer func() {
		consumerClose()
	}()

	prod, err := env.NewProducer(streamName, stream.NewProducerOptions())
	if err != nil {
		panic(err)
	}

	handlePublishConfirm(prod.NotifyPublishConfirmation())

	ticker := time.NewTicker(100 * time.Millisecond)
	defer ticker.Stop()
	produced := 0
__LOOP:
	for {
		select {
		case <-ticker.C:
			break __LOOP
		default:
		}
		msg := amqp.NewMessage([]byte(fmt.Sprintf("producing: %d", produced)))
		if err := prod.Send(msg); err != nil {
			panic(err)
		}
		produced++
	}
	log.Printf("produced %d messages\n", produced)
	stream.SetLevelInfo(logs.DEBUG)
	notification := prod.NotifyClose()
	// change from prod.Close() to env.Close()
	if err := prod.Close(); err != nil {
		log.Println("could not close producer:", err)
	}
	<-notification

	log.Printf("got close notification. Produced: %d, Consumed: %d , failed %d - Total %d - confirmed %d \n", produced,
		atomic.LoadInt64(&consumed), atomic.LoadInt32(&fail),
		atomic.LoadInt64(&consumed)+int64(atomic.LoadInt32(&fail)), atomic.LoadInt32(&count))
}

var consumed int64

func consume(consumerContext stream.ConsumerContext, message *amqp.Message) {
	atomic.AddInt64(&consumed, 1)
}

func setupStream(env *stream.Environment) error {
	env.DeleteStream(streamName)
	ok, err := env.StreamExists(streamName)
	if err != nil {
		return err
	}
	if !ok {
		if err := env.DeclareStream(
			streamName,
			&stream.StreamOptions{MaxAge: 24 * 7 * time.Hour},
		); err != nil {
			return err
		}
	}
	return nil
}

at the end you should see something like:

2021/11/09 11:54:24 got close notification. Produced: 53174, Consumed: 53174 , failed 0 - Total 53174 - confirmed 53174

Thank you!

@brenol
Copy link

brenol commented Nov 9, 2021

Hi @Gsantomaggio, thank you for the fast response & fast PR. 🥳

However, unfortunately, it did not work 100% for me:

go.mod:

module github.com/afyadigital/playground/poc-stream

go 1.17

require github.com/rabbitmq/rabbitmq-stream-go-client v0.1.0-beta.0.20211108202302-8523f78f156b

require (
    github.com/golang/snappy v0.0.4 // indirect
    github.com/klauspost/compress v1.13.6 // indirect
    github.com/pierrec/lz4 v2.6.1+incompatible // indirect
    github.com/pkg/errors v0.9.1 // indirect
)

Code:

package main

import (
    "fmt"
    "log"
    "sync/atomic"
    "time"

    "github.com/rabbitmq/rabbitmq-stream-go-client/pkg/logs"

    "github.com/rabbitmq/rabbitmq-stream-go-client/pkg/amqp"
    "github.com/rabbitmq/rabbitmq-stream-go-client/pkg/stream"
)

const streamName = "test-output"

var count int32
var fail int32

func handlePublishConfirm(confirms stream.ChannelPublishConfirm) {
    go func() {
        for confirmed := range confirms {
            for _, msg := range confirmed {
                if msg.IsConfirmed() {
                    atomic.AddInt32(&count, 1)
                } else {
                    atomic.AddInt32(&fail, 1)
                }

            }
        }
    }()
}

func main() {

    env, err := stream.NewEnvironment(
        stream.NewEnvironmentOptions().SetUri("tcp://guest:guest@localhost:5552"),
    )
    if err != nil {
        panic(err)
    }
    if err := setupStream(env); err != nil {
        panic(err)
    }

    // consumer
    // using producer.Close() no messages will be consumed
    var consumerClose func() error
    go func() {
        opts := stream.NewConsumerOptions().
            SetConsumerName("test")
        cons, err := env.NewConsumer(streamName, consume, opts)
        if err != nil {
            panic(err)
        }
        consumerClose = cons.Close
    }()
    defer func() {
        consumerClose()
    }()

    prod, err := env.NewProducer(streamName, stream.NewProducerOptions())
    if err != nil {
        panic(err)
    }

    handlePublishConfirm(prod.NotifyPublishConfirmation())

    ticker := time.NewTicker(100 * time.Millisecond)
    defer ticker.Stop()
    produced := 0
__LOOP:
    for {
        select {
        case <-ticker.C:
            break __LOOP
        default:
        }
        msg := amqp.NewMessage([]byte(fmt.Sprintf("producing: %d", produced)))
        if err := prod.Send(msg); err != nil {
            panic(err)
        }
        produced++
    }
    log.Printf("produced %d messages\n", produced)
    stream.SetLevelInfo(logs.DEBUG)
    notification := prod.NotifyClose()
    // change from prod.Close() to env.Close()
    if err := prod.Close(); err != nil {
        log.Println("could not close producer:", err)
    }
    <-notification

    log.Printf("got close notification. Produced: %d, Consumed: %d , failed %d - Total %d - confirmed %d \n", produced,
        atomic.LoadInt64(&consumed), atomic.LoadInt32(&fail),
        atomic.LoadInt64(&consumed)+int64(atomic.LoadInt32(&fail)), atomic.LoadInt32(&count))
}

var consumed int64

func consume(consumerContext stream.ConsumerContext, message *amqp.Message) {
    atomic.AddInt64(&consumed, 1)
}

func setupStream(env *stream.Environment) error {
    env.DeleteStream(streamName)
    ok, err := env.StreamExists(streamName)
    if err != nil {
        return err
    }
    if !ok {
        if err := env.DeclareStream(
            streamName,
            &stream.StreamOptions{MaxAge: 24 * 7 * time.Hour},
        ); err != nil {
            return err
        }
    }
    return nil
}

Output:

$ go run main.go            
2021/11/09 14:39:05 produced 41115 messages
2021/11/09 14:39:06 [debug] - Read connection failed: read tcp 127.0.0.1:51032->127.0.0.1:5552: use of closed network connection
2021/11/09 14:39:06 got close notification. Produced: 41115, Consumed: 41045 , failed 70 - Total 41115 - confirmed 41045 
2021/11/09 14:39:06 [debug] - Read connection failed: read tcp 127.0.0.1:51036->127.0.0.1:5552: use of closed network connection

Using env.Close (using it only for comparison reason):

go run main.go
2021/11/09 14:53:01 produced 39399 messages
2021/11/09 14:53:02 [debug] - Read connection failed: read tcp 127.0.0.1:51162->127.0.0.1:5552: use of closed network connection
2021/11/09 14:53:02 [debug] - Read connection failed: read tcp 127.0.0.1:51160->127.0.0.1:5552: use of closed network connection
2021/11/09 14:53:02 got close notification. Produced: 39399, Consumed: 39399 , failed 0 - Total 39399 - confirmed 39399

So on my side its not working 100% of the time.

Thank you again!

@brenol
Copy link

brenol commented Nov 9, 2021

@Gsantomaggio one more test I just did. Tried to move consumerClose() & add a wait (as it could be failing because consumer was not... consuming everything) but unfortunately it did not work as well.

package main

import (
    "fmt"
    "log"
    "sync/atomic"
    "time"

    "github.com/rabbitmq/rabbitmq-stream-go-client/pkg/logs"

    "github.com/rabbitmq/rabbitmq-stream-go-client/pkg/amqp"
    "github.com/rabbitmq/rabbitmq-stream-go-client/pkg/stream"
)

const streamName = "test-output"

var count int32
var fail int32

func handlePublishConfirm(confirms stream.ChannelPublishConfirm) {
    go func() {
        for confirmed := range confirms {
            for _, msg := range confirmed {
                if msg.IsConfirmed() {
                    atomic.AddInt32(&count, 1)
                } else {
                    atomic.AddInt32(&fail, 1)
                }

            }
        }
    }()
}

func main() {
    env, err := stream.NewEnvironment(
        stream.NewEnvironmentOptions().SetUri("tcp://guest:guest@localhost:5552"),
    )
    if err != nil {
        panic(err)
    }
    if err := setupStream(env); err != nil {
        panic(err)
    }

    // consumer
    var (
        consumerClose        func() error
        consumerNotification stream.ChannelClose
    )
    go func() {
        opts := stream.NewConsumerOptions().
            SetConsumerName("test")
        cons, err := env.NewConsumer(streamName, consume, opts)
        if err != nil {
            panic(err)
        }
        consumerClose = cons.Close
        consumerNotification = cons.NotifyClose()
    }()

    prod, err := env.NewProducer(streamName, stream.NewProducerOptions())
    if err != nil {
        panic(err)
    }

    handlePublishConfirm(prod.NotifyPublishConfirmation())

    ticker := time.NewTicker(100 * time.Millisecond)
    defer ticker.Stop()
    produced := 0
__LOOP:
    for {
        select {
        case <-ticker.C:
            break __LOOP
        default:
        }
        msg := amqp.NewMessage([]byte(fmt.Sprintf("producing: %d", produced)))
        if err := prod.Send(msg); err != nil {
            panic(err)
        }
        produced++
    }
    log.Printf("produced %d messages\n", produced)
    stream.SetLevelInfo(logs.DEBUG)
    notification := prod.NotifyClose()
    // change from prod.Close() to env.Close()
    if err := prod.Close(); err != nil {
        log.Println("could not close producer:", err)
    }
    <-notification

    if err := consumerClose(); err != nil {
        log.Println("could not close consumer:", err)
    }
    <-consumerNotification

    log.Printf("got close notification. Produced: %d, Consumed: %d , failed %d - Total %d - confirmed %d \n", produced,
        atomic.LoadInt64(&consumed), atomic.LoadInt32(&fail),
        atomic.LoadInt64(&consumed)+int64(atomic.LoadInt32(&fail)), atomic.LoadInt32(&count))
}

var consumed int64

func consume(consumerContext stream.ConsumerContext, message *amqp.Message) {
    atomic.AddInt64(&consumed, 1)
}

func setupStream(env *stream.Environment) error {
    env.DeleteStream(streamName)
    ok, err := env.StreamExists(streamName)
    if err != nil {
        return err
    }
    if !ok {
        if err := env.DeclareStream(
            streamName,
            &stream.StreamOptions{MaxAge: 24 * 7 * time.Hour},
        ); err != nil {
            return err
        }
    }
    return nil
}

Output:

2021/11/09 15:08:36 produced 44737 messages
2021/11/09 15:08:37 [debug] - Read connection failed: read tcp 127.0.0.1:51604->127.0.0.1:5552: use of closed network connection
2021/11/09 15:08:37 [debug] - Read connection failed: read tcp 127.0.0.1:51602->127.0.0.1:5552: use of closed network connection
2021/11/09 15:08:37 got close notification. Produced: 44737, Consumed: 44659 , failed 78 - Total 44737 - confirmed 44659

@Gsantomaggio
Copy link
Member Author

Thanks @brenol but the commit is not correct :)

require github.com/rabbitmq/rabbitmq-stream-go-client v0.1.0-beta.0.20211108202302-8523f78f156b

you should use:

module github.com/test/client

go 1.17

require github.com/rabbitmq/rabbitmq-stream-go-client v0.1.0-beta.0.20211109104850-f29ee0cf4470

require (
	github.com/golang/snappy v0.0.4 // indirect
	github.com/klauspost/compress v1.13.6 // indirect
	github.com/pierrec/lz4 v2.6.1+incompatible // indirect
	github.com/pkg/errors v0.9.1 // indirect
)

and you should see this log:

2021/11/09 19:11:23 [debug] - waitForInflightMessages, channel: 0 - pending messages len: 42 - unconfirmed len: 26012 - retry: 0

@brenol
Copy link

brenol commented Nov 9, 2021

@Gsantomaggio sorry!! Thank you, it worked flawlessly.

I did not read the commit history correctly (in PRs older are shown first. In the branch, new ones are shown first... 😭 - so sorry again for that!)

Now:

2021/11/09 15:21:37 produced 67675 messages
2021/11/09 15:21:37 [debug] - waitForInflightMessages, channel: 0 - pending messages len: 15 - unconfirmed len: 3663 - retry: 0
2021/11/09 15:21:38 [debug] - Read connection failed: read tcp 127.0.0.1:51988->127.0.0.1:5552: use of closed network connection
2021/11/09 15:21:38 [debug] - Read connection failed: read tcp 127.0.0.1:51986->127.0.0.1:5552: use of closed network connection
2021/11/09 15:21:38 got close notification. Produced: 67675, Consumed: 67675 , failed 0 - Total 67675 - confirmed 67675

Thank you a lot!

@Gsantomaggio
Copy link
Member Author

No problem! :)!
Thank you!

@Gsantomaggio Gsantomaggio merged commit fe231a7 into main Nov 9, 2021
@Gsantomaggio Gsantomaggio deleted the handle_close branch November 9, 2021 18:28
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

client/Producer: Close() is not flushing current inflight messages
3 participants