Comments (12)
Sorry I've been crazy busy at work. I want to look at this when I have some time just haven't had a chance yet
from go-rabbitmq.
Close your publishers and consumers when you're done with them and do not attempt to reuse them. Only close the connection itself once you've closed all associated publishers and consumers.
The issue has been resolved through conn.close.But it's not possible to repeat newConn.I did not reuse the publisher, but I reused Conn. When the publisher closed, startNotifyBlockedHandler was still blocking, so I had to reuse the publisher
from go-rabbitmq.
tough, that seems to be a leaky goroutine :-( indeed, the design does not cater for its termination. Short of moving things around [ e.g. a) provide an additional control channel like a context triggering on publisher close events or b) making the blocked chan parameter part of the publisher struct -- so it closes automatically when publisher is GC + changing the range blockings into a for/select also listening to this additional channel in addition to blockings
], your best bet is to leave that goroutine commented out and live with the fact that connection blocking (has this really happened ever in real life?) will block your publishing/consuming (which ideally should have a time-out anyway).
from go-rabbitmq.
It seems to me that the most likely issue is that you have a lot of Publish() calls getting blocked. This function: startNotifyBlockedHandler
waits for any in-flight publishes to finish and then stops new ones from happening. So if they are hanging indefinately for some reason that could be the root cause. I'd argue something at the infrastructure level is probably the issue.
You could add a context timeout, and that should stop them from hanging forever.
from go-rabbitmq.
Hello, this seems still to be an issue.
I could reproduce this goroutine leak by using the example of the readme and printing the stacktrace before and after closing the publisher. The goroutine started at startup will not terminate with closing the publisher.
The problem is the goroutine is started and passes a notify chan
(calling go channels chan
to prevent confusion with rabbitmqs Channels) to the Connection
. This chan
will only be closed if the connection to rabbitmq is closed here.
But a long lived service will stay connected to the rabbitmq therefore hold the Connection
alive. And only creating and closing publishers as needed. This results in the connection holding all channels in the blocks array created on startNotifyBlockedHandler
, not closing it when a publisher shutdowns and keeping the goroutine at the for loop alive.
Contrary to the startNotifyFlowHandler which is also started on the creation of a publisher its chan
is managed by a Channel. This channel is closed if the publisher Close
function is called. Which terminates this routine started by the publisher.
I have two ideas for a fix:
- your above mentioned context which is passed down into the function and acts as a lifetime, if the publisher is closed this lifetime is exceeded and the routine finishes. This is NOT allowed to close the channel or it will panic if the Connection is terminated and trying to close the channel aswell
- tag the channel passed, notify the connection to remove the channel on publisher close. This would require changes in the wrapped library as well
I could implement the first. The second option would need discussion and implementation in amqp091-go
library first.
Take care!
from go-rabbitmq.
Hello, this seems still to be an issue.
I could reproduce this goroutine leak by using the example of the readme and printing the stacktrace before and after closing the publisher. The goroutine started at startup will not terminate with closing the publisher.
The problem is the goroutine is started and passes a notify
chan
(calling go channelschan
to prevent confusion with rabbitmqs Channels) to theConnection
. Thischan
will only be closed if the connection to rabbitmq is closed here.But a long lived service will stay connected to the rabbitmq therefore hold the
Connection
alive. And only creating and closing publishers as needed. This results in the connection holding all channels in the blocks array created onstartNotifyBlockedHandler
, not closing it when a publisher shutdowns and keeping the goroutine at the for loop alive.Contrary to the startNotifyFlowHandler which is also started on the creation of a publisher its
chan
is managed by a Channel. This channel is closed if the publisherClose
function is called. Which terminates this routine started by the publisher.I have two ideas for a fix:
- your above mentioned context which is passed down into the function and acts as a lifetime, if the publisher is closed this lifetime is exceeded and the routine finishes. This is NOT allowed to close the channel or it will panic if the Connection is terminated and trying to close the channel aswell
- tag the channel passed, notify the connection to remove the channel on publisher close. This would require changes in the wrapped library as well
I could implement the first. The second option would need discussion and implementation in
amqp091-go
library first.Take care!
once.Do rabbitmq.NewPublisher;Reusing this connection has not caused any problems after large-scale high-concurrency testing
from go-rabbitmq.
For me this is not a fix of the problem. This does not change the fact that even initializing one publisher and closing it will not stop all started goroutines from this publisher thus resulting in a leaking goroutine.
Furthermore someone can have multiple publisher which are written to simultaneously with different publisher options configured. So i'm confused how it should be possible to reuse a publisher created with different settings which does not allow to change them. Can you elaborate this more?
from go-rabbitmq.
For me this is not a fix of the problem. This does not change the fact that even initializing one publisher and closing it will not stop all started goroutines from this publisher thus resulting in a leaking goroutine. Furthermore someone can have multiple publisher which are written to simultaneously with different publisher options configured. So i'm confused how it should be possible to reuse a publisher created with different settings which does not allow to change them. Can you elaborate this more?
var once sync.Once
var oncePublish sync.Once
var conn *rabbitmq.Conn
var publisher *rabbitmq.Publisher
func GetMqConn() *rabbitmq.Conn {
once.Do(func() {
var err error
conn, err = rabbitmq.NewConn(GetMqUrl())
if err != nil {
logging.Logger.Error("rabbitmq conn err", zap.Error(err))
}
})
return conn
}
func GetPublisher() (*rabbitmq.Publisher, error) {
var err error
oncePublish.Do(func() {
publisher, err = rabbitmq.NewPublisher(
GetMqConn(),
rabbitmq.WithPublisherOptionsLogging,
)
})
return publisher, err
}
Our business is relatively simple, with only one configuration. We can use GetPublisher globally, which has withstood the test of high concurrency. If you have multiple configurations, my suggestion is to put different configurations of Publishers into the Publisher pool to reuse the same configuration of Publishers
from go-rabbitmq.
Ah thank you for your example @lmb1113 . We have a different use case, we open a lot of publishers because each client gets its own queue and exchange for incoming messages
from go-rabbitmq.
Hello,
dont want to necro bump but @wagslane should i open a new issue for this? Or can this issue be reopened?
from go-rabbitmq.
Hello, I noticed that each publisher has created the startNotifyBlockedHandler function. Can we adjust it to use the same function for each conn? My understanding is that the function of startNotifyBlockedHandler is to prevent publishing if the connection is not available
@mjurczik @wagslane
Reproduce memory leak process, when globally connected once, multiple publishers
from go-rabbitmq.
publisher.startNotifyBlockedHandler()->conn.startNotifyBlockedHandler()?
from go-rabbitmq.
Related Issues (20)
- get message in a variable like github.com/streadway/amqp HOT 2
- error on ack HOT 1
- Retry Publish Option HOT 5
- Direct reply-to HOT 2
- rabbitmq-manager no consumers HOT 2
- only reconnecting some consumers HOT 1
- Shutdown after a fixed number of reconnection attempts HOT 2
- Support for Dead Letter Exchange HOT 1
- Cluster support HOT 1
- Declare queue without consuming messages HOT 2
- My business requires batch processing of messages. Currently, the Consumer can only provide one callback at a time. Can I open up chan (msgs)? I want to handle message callbacks myself HOT 1
- received 'direct' but current is 'topic'" HOT 1
- A queue can accept multiple bindings from multiple exchanges HOT 1
- Support for RabbitMQ Clusters HOT 3
- how to gracefully shutdown mq consumers?
- Docs specify `WithPublisherOptionsExchangeDeclare` *stops* this library from declaring the exchanges existance
- Asynchronous reconnection
- NotifyPublish ReconnectCount - how is it intended to be used? HOT 1
- reconnection bug
Recommend Projects
-
React
A declarative, efficient, and flexible JavaScript library for building user interfaces.
-
Vue.js
🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
-
Typescript
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
-
TensorFlow
An Open Source Machine Learning Framework for Everyone
-
Django
The Web framework for perfectionists with deadlines.
-
Laravel
A PHP framework for web artisans
-
D3
Bring data to life with SVG, Canvas and HTML. 📊📈🎉
-
Recommend Topics
-
javascript
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
-
web
Some thing interesting about web. New door for the world.
-
server
A server is a program made to process requests and deliver data to clients.
-
Machine learning
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
-
Visualization
Some thing interesting about visualization, use data art
-
Game
Some thing interesting about game, make everyone happy.
Recommend Org
-
Facebook
We are working to build community through open source technology. NB: members must have two-factor auth.
-
Microsoft
Open source projects and samples from Microsoft.
-
Google
Google ❤️ Open Source for everyone.
-
Alibaba
Alibaba Open Source for everyone
-
D3
Data-Driven Documents codes.
-
Tencent
China tencent open source team.
from go-rabbitmq.