Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

goroutine leak #462

Open
tsywkGo opened this issue Feb 5, 2021 · 3 comments
Open

goroutine leak #462

tsywkGo opened this issue Feb 5, 2021 · 3 comments

Comments

@tsywkGo
Copy link

tsywkGo commented Feb 5, 2021

Expected behavior

func (c *connection) start() {
	// Each connection gets its own goroutine that will
	go func() {
		if c.connect() {
			if c.doHandshake() {
				c.run()
			} else {
				c.changeState(connectionClosed)
			}
		} else {
			c.changeState(connectionClosed)
		}
	}()
}

this code, If the pulsar doesn't work properly, it will cause a goroutine leak.

@freeznet
Copy link
Contributor

freeznet commented Feb 9, 2021

thanks @yuanweikang2020 for this issue, can you provide more context how to reproduce or verify this issue?

@tsywkGo
Copy link
Author

tsywkGo commented Feb 9, 2021

thanks @yuanweikang2020 for this issue, can you provide more context how to reproduce or verify this issue?

when pulsar is failure, pulsar client reconnect

func (p *partitionProducer) runEventsLoop() {
	for {
		select {
		case i := <-p.eventsChan:
			switch v := i.(type) {
			case *sendRequest:
				p.internalSend(v)
			case *connectionClosed:
				p.reconnectToBroker()
			case *flushRequest:
				p.internalFlush(v)
			case *closeProducer:
				p.internalClose(v)
				return
			}

		case <-p.batchFlushTicker.C:
			p.internalFlushCurrentBatch()
		}
	}
}

when reconnect pulsar Broker

func (p *partitionProducer) reconnectToBroker() {
	backoff := internal.Backoff{}
	for {
		if p.state != producerReady {
			// Producer is already closing
			return
		}

		d := backoff.Next()
		p.log.Info("Reconnecting to broker in ", d)
		time.Sleep(d)

		err := p.grabCnx()
		if err == nil {
			// Successfully reconnected
			p.log.WithField("cnx", p.cnx.ID()).Info("Reconnected producer to broker")
			return
		}
	}
}

get connection frequently

func (c *rpcClient) getConn(logicalAddr *url.URL, physicalAddr *url.URL) (Connection, error) {
	cnx, err := c.pool.GetConnection(logicalAddr, physicalAddr)
	backoff := Backoff{1 * time.Second}
	startTime := time.Now()
	var retryTime time.Duration
	if err != nil {
		for time.Since(startTime) < c.requestTimeout {
			retryTime = backoff.Next()
			c.log.Debugf("Reconnecting to broker in {%v} with timeout in {%v}", retryTime, c.requestTimeout)
			time.Sleep(retryTime)
			cnx, err = c.pool.GetConnection(logicalAddr, physicalAddr)
			if err == nil {
				c.log.Debugf("retry connection success")
				return cnx, nil
			}
		}
		return nil, err
	}
	return cnx, nil
}

@rueian
Copy link
Contributor

rueian commented Feb 17, 2021

There are also goroutine leaks in consumer_partition.go and connection.go

func (c *connection) failLeftRequestsWhenClose() {
for req := range c.incomingRequestsCh {
c.internalSendRequest(req)
}
close(c.incomingRequestsCh)
}

The incomingRequestsCh never closed, therefore this failLeftRequestsWhenClose will never exit.

for i := range pc.eventsCh {

The eventsCh never closed, therefor this runEventsLoop will never exit.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants