Giter Club home page Giter Club logo

Comments (12)

wagslane avatar wagslane commented on May 25, 2024 1

Sorry I've been crazy busy at work. I want to look at this when I have some time just haven't had a chance yet

from go-rabbitmq.

lmb1113 avatar lmb1113 commented on May 25, 2024

Close your publishers and consumers when you're done with them and do not attempt to reuse them. Only close the connection itself once you've closed all associated publishers and consumers.

The issue has been resolved through conn.close.But it's not possible to repeat newConn.I did not reuse the publisher, but I reused Conn. When the publisher closed, startNotifyBlockedHandler was still blocking, so I had to reuse the publisher

from go-rabbitmq.

LucaWolf avatar LucaWolf commented on May 25, 2024

tough, that seems to be a leaky goroutine :-( indeed, the design does not cater for its termination. Short of moving things around [ e.g. a) provide an additional control channel like a context triggering on publisher close events or b) making the blocked chan parameter part of the publisher struct -- so it closes automatically when publisher is GC + changing the range blockings into a for/select also listening to this additional channel in addition to blockings], your best bet is to leave that goroutine commented out and live with the fact that connection blocking (has this really happened ever in real life?) will block your publishing/consuming (which ideally should have a time-out anyway).

from go-rabbitmq.

wagslane avatar wagslane commented on May 25, 2024

It seems to me that the most likely issue is that you have a lot of Publish() calls getting blocked. This function: startNotifyBlockedHandler waits for any in-flight publishes to finish and then stops new ones from happening. So if they are hanging indefinately for some reason that could be the root cause. I'd argue something at the infrastructure level is probably the issue.

You could add a context timeout, and that should stop them from hanging forever.

from go-rabbitmq.

mjurczik avatar mjurczik commented on May 25, 2024

Hello, this seems still to be an issue.

I could reproduce this goroutine leak by using the example of the readme and printing the stacktrace before and after closing the publisher. The goroutine started at startup will not terminate with closing the publisher.

The problem is the goroutine is started and passes a notify chan (calling go channels chan to prevent confusion with rabbitmqs Channels) to the Connection. This chan will only be closed if the connection to rabbitmq is closed here.

But a long lived service will stay connected to the rabbitmq therefore hold the Connection alive. And only creating and closing publishers as needed. This results in the connection holding all channels in the blocks array created on startNotifyBlockedHandler, not closing it when a publisher shutdowns and keeping the goroutine at the for loop alive.

Contrary to the startNotifyFlowHandler which is also started on the creation of a publisher its chan is managed by a Channel. This channel is closed if the publisher Close function is called. Which terminates this routine started by the publisher.

I have two ideas for a fix:

  • your above mentioned context which is passed down into the function and acts as a lifetime, if the publisher is closed this lifetime is exceeded and the routine finishes. This is NOT allowed to close the channel or it will panic if the Connection is terminated and trying to close the channel aswell
  • tag the channel passed, notify the connection to remove the channel on publisher close. This would require changes in the wrapped library as well

I could implement the first. The second option would need discussion and implementation in amqp091-go library first.

Take care!

from go-rabbitmq.

lmb1113 avatar lmb1113 commented on May 25, 2024

Hello, this seems still to be an issue.

I could reproduce this goroutine leak by using the example of the readme and printing the stacktrace before and after closing the publisher. The goroutine started at startup will not terminate with closing the publisher.

The problem is the goroutine is started and passes a notify chan (calling go channels chan to prevent confusion with rabbitmqs Channels) to the Connection. This chan will only be closed if the connection to rabbitmq is closed here.

But a long lived service will stay connected to the rabbitmq therefore hold the Connection alive. And only creating and closing publishers as needed. This results in the connection holding all channels in the blocks array created on startNotifyBlockedHandler, not closing it when a publisher shutdowns and keeping the goroutine at the for loop alive.

Contrary to the startNotifyFlowHandler which is also started on the creation of a publisher its chan is managed by a Channel. This channel is closed if the publisher Close function is called. Which terminates this routine started by the publisher.

I have two ideas for a fix:

  • your above mentioned context which is passed down into the function and acts as a lifetime, if the publisher is closed this lifetime is exceeded and the routine finishes. This is NOT allowed to close the channel or it will panic if the Connection is terminated and trying to close the channel aswell
  • tag the channel passed, notify the connection to remove the channel on publisher close. This would require changes in the wrapped library as well

I could implement the first. The second option would need discussion and implementation in amqp091-go library first.

Take care!

once.Do rabbitmq.NewPublisher;Reusing this connection has not caused any problems after large-scale high-concurrency testing

from go-rabbitmq.

mjurczik avatar mjurczik commented on May 25, 2024

For me this is not a fix of the problem. This does not change the fact that even initializing one publisher and closing it will not stop all started goroutines from this publisher thus resulting in a leaking goroutine.
Furthermore someone can have multiple publisher which are written to simultaneously with different publisher options configured. So i'm confused how it should be possible to reuse a publisher created with different settings which does not allow to change them. Can you elaborate this more?

from go-rabbitmq.

lmb1113 avatar lmb1113 commented on May 25, 2024

For me this is not a fix of the problem. This does not change the fact that even initializing one publisher and closing it will not stop all started goroutines from this publisher thus resulting in a leaking goroutine. Furthermore someone can have multiple publisher which are written to simultaneously with different publisher options configured. So i'm confused how it should be possible to reuse a publisher created with different settings which does not allow to change them. Can you elaborate this more?

var once sync.Once
var oncePublish sync.Once
var conn *rabbitmq.Conn
var publisher *rabbitmq.Publisher

func GetMqConn() *rabbitmq.Conn {
	once.Do(func() {
		var err error
		conn, err = rabbitmq.NewConn(GetMqUrl())
		if err != nil {
			logging.Logger.Error("rabbitmq conn err", zap.Error(err))
		}
	})
	return conn
}

func GetPublisher() (*rabbitmq.Publisher, error) {
	var err error
	oncePublish.Do(func() {
		publisher, err = rabbitmq.NewPublisher(
			GetMqConn(),
			rabbitmq.WithPublisherOptionsLogging,
		)
	})
	return publisher, err
}

Our business is relatively simple, with only one configuration. We can use GetPublisher globally, which has withstood the test of high concurrency. If you have multiple configurations, my suggestion is to put different configurations of Publishers into the Publisher pool to reuse the same configuration of Publishers

from go-rabbitmq.

mjurczik avatar mjurczik commented on May 25, 2024

Ah thank you for your example @lmb1113 . We have a different use case, we open a lot of publishers because each client gets its own queue and exchange for incoming messages

from go-rabbitmq.

mjurczik avatar mjurczik commented on May 25, 2024

Hello,

dont want to necro bump but @wagslane should i open a new issue for this? Or can this issue be reopened?

from go-rabbitmq.

lmb1113 avatar lmb1113 commented on May 25, 2024

Hello, I noticed that each publisher has created the startNotifyBlockedHandler function. Can we adjust it to use the same function for each conn? My understanding is that the function of startNotifyBlockedHandler is to prevent publishing if the connection is not available
@mjurczik @wagslane
Reproduce memory leak process, when globally connected once, multiple publishers

from go-rabbitmq.

lmb1113 avatar lmb1113 commented on May 25, 2024

publisher.startNotifyBlockedHandler()->conn.startNotifyBlockedHandler()?

from go-rabbitmq.

Related Issues (20)

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.