Giter Club home page Giter Club logo

Comments (20)

wagslane avatar wagslane commented on September 2, 2024 2

Yeah this makes sense to me @mjurczik ! If we have 3 publishers on a single connection, we should be able to close 1 publisher, keep the connection alive for the other 2, and release any publisher specific resources.

If you'd like to propose a PR I will take a look when I have time

from go-rabbitmq.

wagslane avatar wagslane commented on September 2, 2024 1

Sorry I've been crazy busy at work. I want to look at this when I have some time just haven't had a chance yet

from go-rabbitmq.

mjurczik avatar mjurczik commented on September 2, 2024 1

Hello,

So all resources are not necessarily expected to be cleaned up when you close the publisher, you would need to also close the underlying connection itself.

Imo this should not be expected. A connection should be reusable, removing a publisher and have them free their memory along side their started routines should not force you to close the connection aswell.
In the readme itself it says: Close your publishers and consumers when you're done with them and do not attempt to reuse them. Only close the connection itself once you've closed all associated publishers and consumers.

When you close both are you still seeing a "leak"?

Sure if i shutdown my programme and gracefully close all publisher and afterwards close the connection the routines will be stopped gracefully. That happens because on conn.Close() function shutdown closes the channels blocking the startNotifyFlowHandler.

Closing the connection seems not to be a suitable fix:

Lets say we have 3 Publishers, and we want to close one of them because it is not needed anymore.
We can't simply close it AND the connection. This will eventually break the connection required by the other still in use publishers as well. Closing only the publisher will keep the startNotifyFlowHandler routine alive, which doesn't do anything anymore, because the publisher it is assigned to and the notify will be reported to is already closed and discarded.

I thought about my "fix" with the lifetime, the problem is this would finish the routine and allow the publisher + routine to be garbage collected but the chan whould still be hold in the Connection blocks array in amqp lib. I guess it would be better to implement something similar in the ConnectionManager it holds the block channels, but on NotifyBlockedSafe does not pass them down, instead the ConnectionManager passes one channel down to amqp and on signal iterates through the selfmanaged blocks channels. Additionally a function to remove a channel from the connection manager would be needed allowing to remove it.

If you like i can implement this, create a PR and you can still decide if this does make sense for you :)

from go-rabbitmq.

lmb1113 avatar lmb1113 commented on September 2, 2024

Close your publishers and consumers when you're done with them and do not attempt to reuse them. Only close the connection itself once you've closed all associated publishers and consumers.

The issue has been resolved through conn.close.But it's not possible to repeat newConn.I did not reuse the publisher, but I reused Conn. When the publisher closed, startNotifyBlockedHandler was still blocking, so I had to reuse the publisher

from go-rabbitmq.

LucaWolf avatar LucaWolf commented on September 2, 2024

tough, that seems to be a leaky goroutine :-( indeed, the design does not cater for its termination. Short of moving things around [ e.g. a) provide an additional control channel like a context triggering on publisher close events or b) making the blocked chan parameter part of the publisher struct -- so it closes automatically when publisher is GC + changing the range blockings into a for/select also listening to this additional channel in addition to blockings], your best bet is to leave that goroutine commented out and live with the fact that connection blocking (has this really happened ever in real life?) will block your publishing/consuming (which ideally should have a time-out anyway).

from go-rabbitmq.

wagslane avatar wagslane commented on September 2, 2024

It seems to me that the most likely issue is that you have a lot of Publish() calls getting blocked. This function: startNotifyBlockedHandler waits for any in-flight publishes to finish and then stops new ones from happening. So if they are hanging indefinately for some reason that could be the root cause. I'd argue something at the infrastructure level is probably the issue.

You could add a context timeout, and that should stop them from hanging forever.

from go-rabbitmq.

mjurczik avatar mjurczik commented on September 2, 2024

Hello, this seems still to be an issue.

I could reproduce this goroutine leak by using the example of the readme and printing the stacktrace before and after closing the publisher. The goroutine started at startup will not terminate with closing the publisher.

The problem is the goroutine is started and passes a notify chan (calling go channels chan to prevent confusion with rabbitmqs Channels) to the Connection. This chan will only be closed if the connection to rabbitmq is closed here.

But a long lived service will stay connected to the rabbitmq therefore hold the Connection alive. And only creating and closing publishers as needed. This results in the connection holding all channels in the blocks array created on startNotifyBlockedHandler, not closing it when a publisher shutdowns and keeping the goroutine at the for loop alive.

Contrary to the startNotifyFlowHandler which is also started on the creation of a publisher its chan is managed by a Channel. This channel is closed if the publisher Close function is called. Which terminates this routine started by the publisher.

I have two ideas for a fix:

  • your above mentioned context which is passed down into the function and acts as a lifetime, if the publisher is closed this lifetime is exceeded and the routine finishes. This is NOT allowed to close the channel or it will panic if the Connection is terminated and trying to close the channel aswell
  • tag the channel passed, notify the connection to remove the channel on publisher close. This would require changes in the wrapped library as well

I could implement the first. The second option would need discussion and implementation in amqp091-go library first.

Take care!

from go-rabbitmq.

lmb1113 avatar lmb1113 commented on September 2, 2024

Hello, this seems still to be an issue.

I could reproduce this goroutine leak by using the example of the readme and printing the stacktrace before and after closing the publisher. The goroutine started at startup will not terminate with closing the publisher.

The problem is the goroutine is started and passes a notify chan (calling go channels chan to prevent confusion with rabbitmqs Channels) to the Connection. This chan will only be closed if the connection to rabbitmq is closed here.

But a long lived service will stay connected to the rabbitmq therefore hold the Connection alive. And only creating and closing publishers as needed. This results in the connection holding all channels in the blocks array created on startNotifyBlockedHandler, not closing it when a publisher shutdowns and keeping the goroutine at the for loop alive.

Contrary to the startNotifyFlowHandler which is also started on the creation of a publisher its chan is managed by a Channel. This channel is closed if the publisher Close function is called. Which terminates this routine started by the publisher.

I have two ideas for a fix:

  • your above mentioned context which is passed down into the function and acts as a lifetime, if the publisher is closed this lifetime is exceeded and the routine finishes. This is NOT allowed to close the channel or it will panic if the Connection is terminated and trying to close the channel aswell
  • tag the channel passed, notify the connection to remove the channel on publisher close. This would require changes in the wrapped library as well

I could implement the first. The second option would need discussion and implementation in amqp091-go library first.

Take care!

once.Do rabbitmq.NewPublisher;Reusing this connection has not caused any problems after large-scale high-concurrency testing

from go-rabbitmq.

mjurczik avatar mjurczik commented on September 2, 2024

For me this is not a fix of the problem. This does not change the fact that even initializing one publisher and closing it will not stop all started goroutines from this publisher thus resulting in a leaking goroutine.
Furthermore someone can have multiple publisher which are written to simultaneously with different publisher options configured. So i'm confused how it should be possible to reuse a publisher created with different settings which does not allow to change them. Can you elaborate this more?

from go-rabbitmq.

lmb1113 avatar lmb1113 commented on September 2, 2024

For me this is not a fix of the problem. This does not change the fact that even initializing one publisher and closing it will not stop all started goroutines from this publisher thus resulting in a leaking goroutine. Furthermore someone can have multiple publisher which are written to simultaneously with different publisher options configured. So i'm confused how it should be possible to reuse a publisher created with different settings which does not allow to change them. Can you elaborate this more?

var once sync.Once
var oncePublish sync.Once
var conn *rabbitmq.Conn
var publisher *rabbitmq.Publisher

func GetMqConn() *rabbitmq.Conn {
	once.Do(func() {
		var err error
		conn, err = rabbitmq.NewConn(GetMqUrl())
		if err != nil {
			logging.Logger.Error("rabbitmq conn err", zap.Error(err))
		}
	})
	return conn
}

func GetPublisher() (*rabbitmq.Publisher, error) {
	var err error
	oncePublish.Do(func() {
		publisher, err = rabbitmq.NewPublisher(
			GetMqConn(),
			rabbitmq.WithPublisherOptionsLogging,
		)
	})
	return publisher, err
}

Our business is relatively simple, with only one configuration. We can use GetPublisher globally, which has withstood the test of high concurrency. If you have multiple configurations, my suggestion is to put different configurations of Publishers into the Publisher pool to reuse the same configuration of Publishers

from go-rabbitmq.

mjurczik avatar mjurczik commented on September 2, 2024

Ah thank you for your example @lmb1113 . We have a different use case, we open a lot of publishers because each client gets its own queue and exchange for incoming messages

from go-rabbitmq.

mjurczik avatar mjurczik commented on September 2, 2024

Hello,

dont want to necro bump but @wagslane should i open a new issue for this? Or can this issue be reopened?

from go-rabbitmq.

lmb1113 avatar lmb1113 commented on September 2, 2024

Hello, I noticed that each publisher has created the startNotifyBlockedHandler function. Can we adjust it to use the same function for each conn? My understanding is that the function of startNotifyBlockedHandler is to prevent publishing if the connection is not available
@mjurczik @wagslane
Reproduce memory leak process, when globally connected once, multiple publishers

from go-rabbitmq.

lmb1113 avatar lmb1113 commented on September 2, 2024

publisher.startNotifyBlockedHandler()->conn.startNotifyBlockedHandler()?

from go-rabbitmq.

hotrush avatar hotrush commented on September 2, 2024

We also met this issue. We don't reuse publishers or consumers, but when getting a lot of messages to publish we receive OOMs with ~2-3gb memory usage when regular memory usage is about 100-200mb. Hope you can fix it.

from go-rabbitmq.

lmb1113 avatar lmb1113 commented on September 2, 2024

publisher.startNotifyBlockedHandler()->conn.startNotifyBlockedHandler()?

We also met this issue. We don't reuse publishers or consumers, but when getting a lot of messages to publish we receive OOMs with ~2-3gb memory usage when regular memory usage is about 100-200mb. Hope you can fix it.

Suggest reuse publishers or consumers

from go-rabbitmq.

hotrush avatar hotrush commented on September 2, 2024

@lmb1113 we tried that approach but getting a lot of errors like here:

from go-rabbitmq.

lmb1113 avatar lmb1113 commented on September 2, 2024

@lmb1113 we tried that approach but getting a lot of errors like here:

Reuse connections and reuse publishers or consumers?

For me this is not a fix of the problem. This does not change the fact that even initializing one publisher and closing it will not stop all started goroutines from this publisher thus resulting in a leaking goroutine. Furthermore someone can have multiple publisher which are written to simultaneously with different publisher options configured. So i'm confused how it should be possible to reuse a publisher created with different settings which does not allow to change them. Can you elaborate this more?

var once sync.Once
var oncePublish sync.Once
var conn *rabbitmq.Conn
var publisher *rabbitmq.Publisher

func GetMqConn() *rabbitmq.Conn {
	once.Do(func() {
		var err error
		conn, err = rabbitmq.NewConn(GetMqUrl())
		if err != nil {
			logging.Logger.Error("rabbitmq conn err", zap.Error(err))
		}
	})
	return conn
}

func GetPublisher() (*rabbitmq.Publisher, error) {
	var err error
	oncePublish.Do(func() {
		publisher, err = rabbitmq.NewPublisher(
			GetMqConn(),
			rabbitmq.WithPublisherOptionsLogging,
		)
	})
	return publisher, err
}

Our business is relatively simple, with only one configuration. We can use GetPublisher globally, which has withstood the test of high concurrency. If you have multiple configurations, my suggestion is to put different configurations of Publishers into the Publisher pool to reuse the same configuration of Publishers

@hotrush Reuse connections and reuse publishers or consumers?

from go-rabbitmq.

hotrush avatar hotrush commented on September 2, 2024

@lmb1113 connection reused always. Publisher reused - connection/reconnection issues, publisher not reused - memory leaks. Smth like that

from go-rabbitmq.

wagslane avatar wagslane commented on September 2, 2024

@mjurczik So all resources are not necessarily expected to be cleaned up when you close the publisher, you would need to also close the underlying connection itself. When you close both are you still seeing a "leak"?

(And if that's not a suitable use case you might want to just drop down to the amqp lib)

from go-rabbitmq.

Related Issues (20)

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.