Giter Club home page Giter Club logo

go-rabbitmq's Introduction

Hi, I'm Lane Wagner

  • 🖥️ I've been coding in Go primarily for over 6 years
  • 🏗️ I'm the founder of Boot.dev
  • 🦀 I’m learning Rust and Vue.js at the moment
  • 🐦 You can follow me on Twitter @wagslane
  • 🎤 You can also listen to my podcast on BackendBanter.fm

go-rabbitmq's People

Contributors

aaqaishtyaq avatar abbasegbeyemi avatar ajmckee avatar ardinusawan avatar brianmori avatar buni avatar ckoehn avatar felixhuettner avatar fortyanov avatar h44z avatar hashbou avatar hugowetterberg avatar humbertovnavarro avatar jastbytes avatar johanneswuerbach avatar julienschmidt avatar ldmi3i avatar miguelb avatar miketonks avatar perriea avatar pwli0755 avatar qiu-zzz avatar thde avatar thepabloaguilar avatar tomarus avatar tombrouws avatar tscheckenbach avatar victorges avatar vishal-android-freak avatar wagslane avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

go-rabbitmq's Issues

WithPublisherOptionsReconnectInterval is a ConsumerOption. How to use it?

Try to get this option working as currently the publisher got disconnected but not retrying...

Error publishing message: Exception (504) Reason: "channel/connection is not open\

err := publisher.Publish( []byte(data), []string{keyRouter}, rabbitmq.WithPublishOptionsContentType(publishOptions), rabbitmq.WithPublishOptionsMandatory, rabbitmq.WithPublishOptionsPersistentDelivery, rabbitmq.WithPublishOptionsExchange(exchange), )

publisher, err := rabbitmq.NewPublisher( uriRabbit, amqp091.Config{}, // can pass nothing for no logging rabbitmq.WithPublisherOptionsLogging, )

I see in the code the default if 5 seconds, but it seems to not do it with my current config as I keep getting the same error message no stop.

doubt about startNotifyFlowHandler

func (publisher *Publisher) startNotifyFlowHandler() {
	for ok := range publisher.notifyFlowChan {
		publisher.disablePublishDueToFlowMux.Lock()
		publisher.logger.Printf("pausing publishing due to flow request from server")
		if ok {
			publisher.disablePublishDueToFlow = false
		} else {
			publisher.disablePublishDueToFlow = true
		}
		publisher.disablePublishDueToFlowMux.Unlock()
		publisher.logger.Printf("resuming publishing due to flow request from server")
	}
}

but I see the following comment in amqp (channel.go#L49):

	// Listeners for active=true flow control.  When true is sent to a listener,
	// publishing should pause until false is sent to listeners.
	flows []chan bool

Maybe startNotifyFlowHandler should behave like this?

func (publisher *Publisher) startNotifyFlowHandler() {
	// Listeners for active=true flow control.  When true is sent to a listener,
	// publishing should pause until false is sent to listeners.
	for ok := range publisher.notifyFlowChan {
		publisher.disablePublishDueToFlowMux.Lock()
		if ok {
			publisher.logger.Printf("pausing publishing due to flow request from server")
			publisher.disablePublishDueToFlow = true
		} else {
			publisher.disablePublishDueToFlow = false
			publisher.logger.Printf("resuming publishing due to flow request from server")
		}
		publisher.disablePublishDueToFlowMux.Unlock()
	}
}

pls the me know if I get this wrong

Access refused when trying to bind router to queue

Dear sir,
I have "ACCESS_REFUSED - operation not permitted on the default exchange" error when trying your consumer example.
This is my code

consumer, err := rabbitmq.NewConsumer(env.RabbitMqUrl)
	handleError(err, "Create consumer", true)

	err = consumer.StartConsuming(
		func(d rabbitmq.Delivery) bool {
			log.Printf("Consumed: %s", d.Body)

			return true
		},
		"runner_sandbox_worker",
		[]string{"runner_sandbox_routing"},
		rabbitmq.WithConsumeOptionsConcurrency(10),
		rabbitmq.WithConsumeOptionsQueueDurable,
		rabbitmq.WithConsumeOptionsQuorum,
	)
	handleError(err, "Start consuming", true)

Thanks!

Consumer won't reconnect

Hi,

Somehow when network got disconnected for a minutes or so and then network up again, I got a log

gorabbit: rabbit consumer goroutine closed

After that, consumer stops receiving new message.
Why wont it reconnect?

I'm using version 0.6.2, and my code is (more or less) like this

rabbitmqConsumer, err := rabbitmq.NewConsumer(config.constructURL(), amqp.Config{}, rabbitmq.WithConsumerOptionsLogging)
...
rabbitmqConsumer.StartConsuming(
  func(message rabbitmq.Delivery) bool {
    return true
  },
  "",
  []string{""},
  func(options *rabbitmq.ConsumeOptions) {
    options.QueueExclusive = true
    options.ConsumerExclusive = true
    options.QueueDurable = true
    options.BindingExchange = &rabbitmq.BindingExchangeOptions{
      Name:    "my_rabbitmq_topic",
      Kind:    "fanout",
      Durable: true,
    }
  },
)

Make channelManager public

Hi !

Thanks for this useful lib!
I'm wondering if the channelManager struct could be exposed (renamed to ChannelManager) to be imported.
It could also be useful to expose the channelManager struct values of Consumer / Publisher.

My use case is that I'd like to do manual creation of exchange and queues.
I can create a new new AMQP channel but I'd lose the auto reconnect handling of the channelManager.

Thanks!

Allow consumers to disable requeuing of messages

Currently, the delivery is either acknowledge or not based on the bool return value of a consumer. But it is also automatically requeued.
In some cases I do not want the delivery to be requeued if its body contains for example invalid JSON and it needs to be send to for example a dead letter queue for further introspection.

I can imagine that a quick solution would be to change the return value of the consumer to something like func(d Delivery) (bool, bool) where the second bool indicates if it should be requeued or not. I can easily create a PR for that behaviour but wanted to check it first.

Each consumer/ producer has it's own connection?

Hello,
from my understanding of the code, each consumer/ publisher will have its own connection, which contradicts the recommendation of 1 connection per application. In theory, every consumer/ producer should use the same connection but have their own channel? Is this something that's intentional by design, or is there room for improvement here?

Improve document regarding exchange

So, I'm about 2 hours trying to figure it why my published messages weren't arriving on my consumer, and it turns out I should be using the queueName as routing-key for the publisher!

I only discovered this after looking up if the go-rabbitmq was creating an exchange for me, it turns out if left unspecified it binds to the default rabbitmq exchange:

The default exchange is a direct exchange with no name (empty string) pre-declared by the broker. It has one special property that makes it very useful for simple applications: every queue that is created is automatically bound to it with a routing key which is the same as the queue name.

https://www.rabbitmq.com/tutorials/amqp-concepts.html

So, the examples on README won't actually show a working pub-sub cycle that actually works, that is a little bit confusing.

How does reconnection work?

I'm looking at this code

go-rabbitmq/channel.go

Lines 54 to 86 in 98402bf

func (chManager *channelManager) startNotifyCancelOrClosed() {
notifyCloseChan := make(chan *amqp.Error)
notifyCloseChan = chManager.channel.NotifyClose(notifyCloseChan)
notifyCancelChan := make(chan string)
notifyCancelChan = chManager.channel.NotifyCancel(notifyCancelChan)
select {
case err := <-notifyCloseChan:
// If the connection close is triggered by the Server, a reconnection takes place
if err.Server {
chManager.logger.Printf("attempting to reconnect to amqp server after close")
chManager.reconnectWithBackoff()
chManager.logger.Printf("successfully reconnected to amqp server after close")
chManager.notifyCancelOrClose <- err
}
case err := <-notifyCancelChan:
chManager.logger.Printf("attempting to reconnect to amqp server after cancel")
chManager.reconnectWithBackoff()
chManager.logger.Printf("successfully reconnected to amqp server after cancel")
chManager.notifyCancelOrClose <- errors.New(err)
}
// these channels can be closed by amqp
select {
case <-notifyCloseChan:
default:
close(notifyCloseChan)
}
select {
case <-notifyCancelChan:
default:
close(notifyCancelChan)
}
}
and I don't see it blocking. So how does the reconnection work?

shall I keep the publisher object alive in a long run program?

Hi, Wagner!

What's the best way to use NewPublisher?

Shall I init a Publisher as a global object or just create new ones every time I need publish some message?

Considering when publishing messages, the connection may be reset and rebuild in the chManager, it seems we need to re-register the NotifyReturn and NotifyFlow once the connection rebuilt.

AutoReconnect only reconnect the consumer to 1 queue instead of all the queues

Sorry I'm new to golang, I just created 1 consumer to consume multiple queues. But when the connection dropped, and the reconnect mechanism started, it successfully reconnect but only to 1 queue. Below is my code for POC

// main.go
package main

import (
	"log"
	"strings"
	"sync"

	. "poc-reconnect/worker/east"
	. "poc-reconnect/worker/west"

	rabbitmq "github.com/wagslane/go-rabbitmq"
)

var workerList = make(map[string]interface {
	Execute(wg *sync.WaitGroup, concurrent int)
})

func main() {
	amqpAddress := "amqp://guest:guest@localhost:5672/poc"

	// initialize consumer
	consumer, err := rabbitmq.NewConsumer(
		amqpAddress, rabbitmq.Config{},
	)
	if err != nil {
		log.Fatal(err)
	}
	defer consumer.Close()

	// Register the workers
	workerList["west"] = West{Consumer: consumer}
	workerList["east"] = East{Consumer: consumer}

	// worker concurrent configuration
	workerConcurrent := map[string]int{
		"west": 2,
		"east": 2,
	}

	wg := sync.WaitGroup{}

	// run all worker
	for worker, thread := range workerConcurrent {
		wg.Add(1)
		go workerList[strings.ToLower(worker)].Execute(&wg, thread)
	}
	wg.Wait()
}
// worker/east/east.go
package east

import (
	"log"
	"sync"

	rabbitmq "github.com/wagslane/go-rabbitmq"
)

type East struct {
	Consumer rabbitmq.Consumer
}

const work_queue = "east_queue"

func (e East) Execute(wg *sync.WaitGroup, concurrent int) {
	defer wg.Done()

	forever := make(chan bool)
	// Subscribing to the queue
	err := e.Consumer.StartConsuming(
		func(d rabbitmq.Delivery) rabbitmq.Action {
			log.Printf("Received message from %s with content: %s\n", work_queue, d.Body)
			// rabbitmq.Ack, rabbitmq.NackDiscard, rabbitmq.NackRequeue
			return rabbitmq.Ack
		},
		work_queue,
		[]string{work_queue},
		rabbitmq.WithConsumeOptionsConcurrency(concurrent),
		rabbitmq.WithConsumeOptionsQueueDurable,
		rabbitmq.WithConsumeOptionsBindingExchangeName("exchange"),
		rabbitmq.WithConsumeOptionsBindingExchangeKind("direct"),
		rabbitmq.WithConsumeOptionsBindingExchangeDurable,
	)

	if err != nil {
		log.Fatal(err)
	}

	log.Printf("Subscribing to %s for getting messages on %d goroutines\n", work_queue, concurrent)

	<-forever
}
// worker/west/west.go
package west

import (
	"log"
	"sync"

	rabbitmq "github.com/wagslane/go-rabbitmq"
)

type West struct {
	Consumer rabbitmq.Consumer
}

const work_queue = "west_queue"

func (w West) Execute(wg *sync.WaitGroup, concurrent int) {
	defer wg.Done()

	forever := make(chan bool)
	// Subscribing to the queue
	err := w.Consumer.StartConsuming(
		func(d rabbitmq.Delivery) rabbitmq.Action {
			log.Printf("Received message from %s with content: %s\n", work_queue, d.Body)
			// rabbitmq.Ack, rabbitmq.NackDiscard, rabbitmq.NackRequeue
			return rabbitmq.Ack
		},
		work_queue,
		[]string{work_queue},
		rabbitmq.WithConsumeOptionsConcurrency(concurrent),
		rabbitmq.WithConsumeOptionsQueueDurable,
		rabbitmq.WithConsumeOptionsBindingExchangeName("exchange"),
		rabbitmq.WithConsumeOptionsBindingExchangeKind("direct"),
		rabbitmq.WithConsumeOptionsBindingExchangeDurable,
	)

	if err != nil {
		log.Fatal(err)
	}

	log.Printf("Subscribing to %s for getting messages on %d goroutines\n", work_queue, concurrent)

	<-forever
}

On the first run, the consumer correctly initialized
image
image

But after I force close the connection through the RabbitMQ UI to trigger the reconnect mechanism, it only reconnect to 1 queue
image
image

Is this bug or did I do something wrong in my code?

Thanks

Can we set a consumer timeout?

Hello, I read documents about rabbitmq. The default ack timeout for rabbitmq is 30 minutes.
So I want to adjust this. Is go-rabbitmq able to set a consumer ack timeout?

`channelManager` uses empty `amqp.Config` on reconnect

I am facing an issue using mTLS with RabbitMQ and this library. When the connection goes down and the channelManager reconnects, it is using the config member, but in the func newChannelManager it is not set on the channelManager instance, so it is reconnecting using an empty amqp.Config lacking the CA and client certificate to connect with mTLS failing to validate the server's certificate.

Why is Exchange configuration driven by Consumers instead of Publishers

This library seems to have Consumers drive the configuration of the Exchanges rather than the Publishers. For example, Consumers determine if an exchange should auto delete, be durable, or the kind (e.g. topic).

Sample:

	err = consumer.StartConsuming(
			func(d rmq.Delivery) rmq.Action {
				log.Printf("consumed: %v", string(d.Body))
				// true to ACK, false to NACK
				return rmq.Ack
			},
			QUEUE,
			[]string{TOPIC},
			rmq.WithConsumeOptionsBindingExchangeAutoDelete,
			rmq.WithConsumeOptionsBindingExchangeDurable,
			rmq.WithConsumeOptionsBindingExchangeName(""),
			rmq.WithConsumeOptionsBindingExchangeKind("topic"),
			rmq.WithConsumeOptionsQueueArgs(queueArgs),

Isn't this a bit backwards since Publishers are the ones who write to Exchanges and should therefore configure its settings, and then Consumers simply configure the queues that they use? Was this design selected out of personal preference or was there a technical reason for it?

Reconnect on disconnect panics with close of closed channel

RabbitMq Version: 3.8..9
GoLang Version: go1.15.3 darwin/amd64
Reproduce:
Using the example consumer and RabbitMq 3.8.9 within a docker container, a forced restart of rabbitmq initiates the channelManager reconnection strategy commences. However on reconnection to the rabbitmq service the channel is closed and a panic issued with "close of a closed channel"

Desired Behaviour Reconnection and rebinding of consumer handlers

2021/04/23 07:13:12 gorabbit: waiting 8s seconds to attempt to reconnect to amqp server
2021/04/23 07:13:20 gorabbit: successfully reconnected to amqp server after close
2021/04/23 07:13:20 gorabbit: successfully reconnected to amqp server after close
panic: close of closed channel

goroutine 36 [running]:
github.com/wagslane/go-rabbitmq.(*channelManager).startNotifyCancelOrClosed(0xc000140600)
        /Users/ajmckee/go/pkg/mod/github.com/wagslane/[email protected]/channel.go:69 +0x294
created by github.com/wagslane/go-rabbitmq.newChannelManager
        /Users/ajmckee/go/pkg/mod/github.com/wagslane/[email protected]/channel.go:32 +0x167
panic: close of closed channel```

Publish method error

参数的名称不对吗,Publish 方法的routingKeys应该对应 StartConsuming 的 queue 参数,你这个错误让我改代码到了深夜还在继续调试。

How to publish with confirms

confirms := make(chan amqp.Confirmation)
ch.NotifyPublish(confirms)
go func() {
    for confirm := range confirms {
	    if confirm.Ack {
		    // code when messages is confirmed
		    log.Printf("Confirmed")
	    } else {
		    // code when messages is nack-ed
		    log.Printf("Nacked")
	    }
    }
}()

Use lowercase "f" for Logger interface functions

This is not a real issue but I would suggest to use a lowercase f for your Logger interface function, e.g. Errorf(string, ...interface{}) instead of ErrorF(string, ...interface{})

This way, it is much easier to use existing logger implementations directly as this is the more or less default in the Go landscape (IMHO). In addition, also the Go stdlib functions like Printf() use it this way.

Truncated messages

Hello,
Sometimes I receive truncated message.
{"position":{"latitude":0.0,"longitude":0.0,"altitude":0.0},"device":{"id":1,"name":"1111","phone":nu
It supposed to be like this:
{"position":{"latitude":0.0,"longitude":0.0,"altitude":0.0},"device":{"id":1,"name":"1111","phone":null}}

And sometimes I receive message with extra string too.
{"position":{"latitude":0.0,"longitude":0.0,"altitude":0.0},"device":{"id":1,"name":"1111","phone":null}}1111","phone":null}}

How to solve this issue?

Allow consumers to disable declaration of queue

I'm running into an error running the consumer in our uat and production environments.

021/11/30 12:12:17 gorabbit: attempting to reconnect to amqp server after close
2021/11/30 12:12:17 gorabbit: waiting 1s seconds to attempt to reconnect to amqp server
2021/11/30 12:12:17 Failed to consume: %sException (403) Reason: "ACCESS_REFUSED - access to queue '' in vhost '' refused for user '*****'"
exit status 1

Rabbit server is managed by another team and looks like we don't have permissions to declare queue. I can consume fine with same credentials using raw amqp library, but cannot declare a queue.

Feels like this should be optional.

I noticed the following PR which looks like it should solve the issue: #49

data race when calling Publisher.Close()

I thought its my code since I store the publisher in a global var and then access it from goroutines, assuming that would be safe, and now I am having second thoughts about that assumption. But your example code for demonstrating how to use the Publisher has the same problem.

This code:

package main

import (
	"log"

	"github.com/wagslane/go-rabbitmq"
)

func main() {
	publisher, err := rabbitmq.NewPublisher(
		"amqp://user:password@localhost",
		rabbitmq.Config{},
		// can pass nothing for no logging
		rabbitmq.WithPublisherOptionsLogging,
	)
	if err != nil {
		log.Fatal(err)
	}
	// do this after err check or publisher might be nil
	defer publisher.Close()

	err = publisher.Publish(
		[]byte("hello, world"),
		[]string{"routing_key"},
		rabbitmq.WithPublishOptionsContentType("application/json"),
		rabbitmq.WithPublishOptionsMandatory,
		rabbitmq.WithPublishOptionsPersistentDelivery,
	)
	if err != nil {
		log.Fatal(err)
	}

	returns := publisher.NotifyReturn()
	go func() {
		for r := range returns {
			log.Printf("message returned from server: %s", string(r.Body))
		}
	}()
}

compiled with:

go build -race

produces this output:

~/g/s/g/r/dataRace> go build -race && ./dataRace
2022/04/14 17:08:50 message returned from server: hello, world
==================
WARNING: DATA RACE
Read at 0x00c000216068 by main goroutine:
  main.main()
      /home/rakete/go/src/github.com/rakete/dataRace/main.go:37 +0x3b7

Previous write at 0x00c000216068 by goroutine 13:
  github.com/wagslane/go-rabbitmq.(*Publisher).startNotifyFlowHandler()
      /home/rakete/go/pkg/mod/github.com/wagslane/[email protected]/publish.go:208 +0xee

Goroutine 13 (running) created at:
  github.com/wagslane/go-rabbitmq.NewPublisher()
      /home/rakete/go/pkg/mod/github.com/wagslane/[email protected]/publish.go:111 +0x492
  main.main()
      /home/rakete/go/src/github.com/rakete/dataRace/main.go:10 +0xfd
==================
2022/04/14 17:08:50 gorabbit: closing publisher...
2022/04/14 17:08:50 gorabbit: amqp channel closed gracefully
Found 1 data race(s)

The data race is detected almost every time, but sometimes it works fine. Repeat running it a couple of times to see the problem.

how about TCP level reconnection?

Thank you for this nice project!

I'm new to rabbitmq, here is my question:
It seems this lib has handled the reconnection logic of amqp.Channel level, shall we handle the underlaying TCP connection (amqp.Connection) as well?

func (c *Connection) NotifyClose(receiver chan *Error) chan *Error {
	c.m.Lock()
	defer c.m.Unlock()

	if c.noNotify {
		close(receiver)
	} else {
		c.closes = append(c.closes, receiver)
	}

	return receiver
}

memory leak

func (chManager *channelManager) startNotifyCancelOrClosed() {

notifyCloseChan := chManager.channel.NotifyClose(make(chan *amqp.Error, 1))
notifyCancelChan := chManager.channel.NotifyCancel(make(chan string, 1))

The NotifyClose and NotifyCancel methods both add a monitor to the current channel and return to the monitor channel.

select {
        case err := <-notifyCloseChan:
           ...
        case err := <-notifyCancelChan:
          ...
}

If any channel returns an exception, you have performed NotifyClose and NotifyCancel on both channels, and the original channel will remain in AQMP, which should be considered a risk of memory leakage.
If case err := <-notifyCancelChan is triggered all the time, there will be more and more closed listening notification channels saved in AQMP, and all you need is actually one

What happens when Concurrency is 0?

From the code:

// WithConsumeOptionsConcurrency returns a function that sets the concurrency, which means that
// many goroutines will be spawned to run the provided handler on messages

Does that mean the option simply handle gorutines and has no effect on number of underlying consumers - actual rabbitmq consumers?

when server side been maintained, the client will not reconnect success after server side start.

how to handle when server side been maintained, the client will not reconnect success after server side start.

2022/08/09 03:07:15 gorabbit ERROR: attempting to reconnect to amqp server after close with error: Exception (501) Reason: "read tcp IP:54358->IP:5075: i/o timeout"
2022/08/09 03:07:15 gorabbit INFO: waiting 1s seconds to attempt to reconnect to amqp server
2022/08/09 03:07:16 gorabbit WARN: successfully reconnected to amqp server
2022/08/09 03:07:16 gorabbit INFO: successful recovery from: Exception (501) Reason: "read tcp IP:54358->IP:5075: i/o timeout"
2022/08/09 03:07:16 gorabbit INFO: Processing messages on 10 goroutines
2022/08/10 13:03:52 gorabbit INFO: closing consumer...
2022/08/10 13:03:52 gorabbit INFO: rabbit consumer goroutine closed
2022/08/10 13:03:52 gorabbit INFO: rabbit consumer goroutine closed
2022/08/10 13:03:52 gorabbit INFO: rabbit consumer goroutine closed
2022/08/10 13:03:52 gorabbit INFO: rabbit consumer goroutine closed
2022/08/10 13:03:52 gorabbit INFO: rabbit consumer goroutine closed
2022/08/10 13:03:52 gorabbit INFO: rabbit consumer goroutine closed
2022/08/10 13:03:52 gorabbit INFO: rabbit consumer goroutine closed
2022/08/10 13:03:52 gorabbit INFO: amqp channel closed gracefully
2022/08/10 13:03:52 gorabbit INFO: rabbit consumer goroutine closed
2022/08/10 13:03:52 gorabbit INFO: rabbit consumer goroutine closed
2022/08/10 13:03:52 gorabbit INFO: rabbit consumer goroutine closed

panic: close of closed channel

Not sure how this might happen but I got:

panic: close of closed channel

goroutine 15 [running]:
github.com/wagslane/go-rabbitmq.(*channelManager).startNotifyCancelOrClosed(0xc0000c4840)
        /var/lib/jenkins/go/pkg/mod/github.com/wagslane/[email protected]/channel.go:85 +0x1c5
created by github.com/wagslane/go-rabbitmq.newChannelManager
        /var/lib/jenkins/go/pkg/mod/github.com/wagslane/[email protected]/channel.go:35 +0x21a

It is probably happening because channel was closed inside NotifyCancel method

notifyCancelChan = chManager.channel.NotifyCancel(notifyCancelChan)

Add log level to Printf method of the Logger interface

Currently it is not possible to define which log-level message has: info or warning or error.
Need to extend Printf method with logLevel variable. I can suggest PR.

It's a breaking change, but don't really see how such feature can be added without breaking change.

Notify is connection closed

Is there a way to notify my app is rmq connection was closed?
This's super important in different cases like change app status to unhealthy
Thanks

Support clients that don't handle returned messages

Following up on #29

There is a possible issue with the publisher logic due to the fact that it always creates
a NotifyReturn channel around here, but doesn't really know or even have a way to
enforce consumers of the library to actually consume that channel.

By not controlling whether the channel will be really consumed in the end, it is possible
that that background go-routine would get stuck in the exact line referenced above. This
means not only a go-routine possibly leaking in the background, but actually the whole
*amqp.Channel loop hanging as well, since they also have no protection against channels
with no consumers on the other side, as you can see here.

What they do have protection on is that they won't send to a channel that hasn't been
explicitly subscribed with a Notify* function call, so the implicit contract there is that
if you subscribe to a channel, you should read all data sent to it or it should have enough
buffer not to hang the senders (which for cancel/closed channels it can easily be 1 #29).

My suggestion is to do something similar and not create a return channel automatically,
but actually only if the consumer calls some function like Returns() chan Return, setting
up the return channel lazily and only after consumer expressing intent of actually
processing it. That would require some interface changes though like not returning a return
channel from NewPublisher or creating an alternate constructor and updating docs on
the current one to clarify that consumers must listen to the channel if they use mandatory
or immediate publishings.

Exception when stopping the consumer using `consumer.StopConsuming`

Hi, I got an exception whenever I stop the consumer by doing consumer.StopConsuming

panic: runtime error: invalid memory address or nil pointer dereference
[signal SIGSEGV: segmentation violation code=0x1 addr=0x18 pc=0x5e4f38]

goroutine 54 [running]:
github.com/wagslane/go-rabbitmq.(*channelManager).startNotifyCancelOrClosed(0xc000242080)
	/go/pkg/mod/github.com/wagslane/[email protected]/channel.go:62 +0x158
created by github.com/wagslane/go-rabbitmq.newChannelManager
	/go/pkg/mod/github.com/wagslane/[email protected]/channel.go:34 +0x191

This is some basic code I'm doing:

func create_consumer() rabbitmq.Consumer {
	consumer, err := rabbitmq.NewConsumer("connection")
	if (err != nil){
		log.Fatal(err)
	}
	return consumer
}


func start_consuming(consumer *rabbitmq.Consumer) {
	err := consumer.StartConsuming(
		func(d rabbitmq.Delivery) bool {
			log.Printf("consumed: %v", string(d.Body))
			time.Sleep(5 * time.Second)
			log.Printf("consumed sleep: %v", string(d.Body))		
			// true to ACK, false to NACK
			return true
		},
		"testing_queue",
		[]string{"testing_queue"})
	if err != nil {
		log.Fatal(err)
	}

}

func main() {
	exit := make(chan os.Signal, 1)
	
	
	signal.Notify(exit, os.Interrupt, syscall.SIGINT, syscall.SIGTERM)
	




	consumer := create_consumer()
	start_consuming(&consumer)
	<-exit

	consumer.StopConsuming()

}

NotifyPublish seems hardly usable

Since amqp091.NotifyPublish tag is an incremental id that is bound to the amqp091.Channel, we cannot identify which message we receive a publish notification for after an automatic reconnect (since the tag is silently reset to 0).
This is an issue linked to how amqp091 secretly handles this id.

If possible, it would be better for this lib to associate the amqp091.Channel tag to a unique rabbitmq.Publisher tag (that is incremented each time we publish without error). That would remove the impact of re-connection, so the original behavior is kept.

Possible solution:

  • have an increment int in channelManager that is incremented on every successful publish when NotifyPublish is on
  • have a startIncrement int in channelManager that is equal to to the increment+startIncrement of the previous channel in case of re-connection.
  • when transmitting a NotifyPublish confirmation, add the startIncrement to the tag provided by amqp091.Channel's Confirmation.

This should ensure the original behaviour is kept even with reconnection.

Graceful shutdown

Currently all the messages which are processing will be re-enqueued during the interruption.

open connection problem, why throw Exception (403)

when i use this repo to connect the rmq server ,but it throw out Exception (403) Reason: "username or password not allowed".

directlty use the github.com/rabbitmq/amqp091-go repo ,then it can connect success.

test twice is use the same uri config string.

so cause the exception is some config error ?

consumer.StopConsuming didn't close consumer right away

In the examples
`package main

import (
"fmt"
"log"
"os"
"os/signal"
"syscall"
"time"

amqp "github.com/rabbitmq/amqp091-go"
rabbitmq "github.com/wagslane/go-rabbitmq"

)

var consumerName = "example"

func main() {
consumer, err := rabbitmq.NewConsumer(
"amqp://guest:guest@localhost", amqp.Config{},
rabbitmq.WithConsumerOptionsLogging,
)
if err != nil {
log.Fatal(err)
}
err = consumer.StartConsuming(
func(d rabbitmq.Delivery) rabbitmq.Action {
log.Printf("consumed: %v", string(d.Body))
// rabbitmq.Ack, rabbitmq.NackDiscard, rabbitmq.NackRequeue
for i:=1;i<10;i++{
fmt.Println(i)
time.Sleep(1* time.Second)
}
return rabbitmq.Ack
},
"my_queue",
[]string{"routing_key", "routing_key_2"},
rabbitmq.WithConsumeOptionsConcurrency(1),
rabbitmq.WithConsumeOptionsQueueDurable,
rabbitmq.WithConsumeOptionsQuorum,
rabbitmq.WithConsumeOptionsBindingExchangeName("events"),
rabbitmq.WithConsumeOptionsBindingExchangeKind("topic"),
rabbitmq.WithConsumeOptionsBindingExchangeDurable,
rabbitmq.WithConsumeOptionsConsumerName(consumerName),
)
if err != nil {
log.Fatal(err)
}

// block main thread - wait for shutdown signal
sigs := make(chan os.Signal, 1)
done := make(chan bool, 1)

signal.Notify(sigs, syscall.SIGINT, syscall.SIGTERM)

go func() {
	sig := <-sigs
	fmt.Println()
	fmt.Println(sig)
	done <- true
}()

fmt.Println("awaiting signal")
<-done
fmt.Println("stopping consumer")

// wait for server to acknowledge the cancel
noWait := false
consumer.StopConsuming(consumerName, noWait)
time.Sleep(9* time.Second)
consumer.Disconnect()

}
`
I want to execute the task after stop consume

I stopped the program after a few seconds,
`2022/01/05 19:47:08 gorabbit: Processing messages on 1 goroutines
2022/01/05 19:47:08 consumed: hello, world
1
awaiting signal
2
3
4
^C
interrupt
stopping consumer
5
6
7
8
9
2022/01/05 19:47:17 consumed: hello, world
1
2
3
4

Process finished with the exit code 0

`
Its start a new consume after ctrl c

Implementation for graceful shutdown for consumers

There seems to be missing case while handling SIGINT, SIGTERM signals in the implementation, if the consumers are running in goroutine.
One possible implementation can be stop listening to new messages (closing the channel), and provide deadline context to running goroutines.

Would like to know if there can more elegant way for handling these case, also open for contributing.

publisher resource closure issues

Every time I call a business method I "new publisher" to push and then "defer close", the memory keeps going up, I use pprof to check that this is indeed the cause, it is not close in time I'm using pprof to check that this is indeed the cause of the problem. Although I know it is better to initialize "single publisher" and then publish after starting the program, this close does not make sense, it feels like it cannot be released.

image

Calling StopConsuming causes duplicated messages

I have a use case where we consume around 2000 m/s, but if I send a SIGTERM to the program, I execute consumer.StopConsuming() which closes both the channel and the connection at the same time, duplicating the messages currently being processed before they can even be acknowledged.

IMO consumer.StopConsuming() should call channel.Cancel and have the user decide when to close the channel and the connection, by creating another method such as consumer.Disconnect()

For example could be something like this:

func main() {
	exit := make(chan os.Signal, 1)
	
	// Read signals, so we can gracefully shutdown the server.
	signal.Notify(exit, os.Interrupt, syscall.SIGINT, syscall.SIGTERM, syscall.SIGKILL)
	
	// Sync group to wait on all the messages already consumed.
	wg := sync.WaitGroup{}

	
	// Create RabbitMQ consumer connection.
	consumer := create_consumer()


	<-exit

	mainLog.Printf("Sigkill received, stopping consumer.")
	
	consumer.StopConsuming() // It cancels the consumer, but doesn't disconnect the channel nor the connection

	mainLog.Printf("Consumer stopped, waiting for group to finish.")

	// time.Sleep(30*time.Second) or wait with a timeout

	wg.Wait()

	mainLog.Printf("Group finished.")

	consumer.Disconnect() //Disconnects the channel and the connection
	

}

Possible data race

After updating to the version I get:

  ==================
  WARNING: DATA RACE
  Write at 0x00c000474220 by goroutine 33:
    github.com/wagslane/go-rabbitmq.(*Publisher).startNotifyFlowHandler()
        go/pkg/mod/github.com/wagslane/[email protected]/publish.go:234 +0x70

  Previous read at 0x00c000474220 by goroutine 24:
    github.com/wagslane/go-rabbitmq.NewPublisher()
        go/pkg/mod/github.com/wagslane/[email protected]/publish.go:167 +0x3b3
    git.vonroll-infratec.com/go/meas/pkg/queue.NewPublisher()
        ws/go/meas/pkg/queue/rab_pub.go:48 +0x604
    git.vonroll-infratec.com/go/meas/pkg/queue.(*RabbitSuite).SetupTest()
        ws/go/meas/pkg/queue/rabbit_test.go:43 +0x119
    github.com/stretchr/testify/suite.Run.func1()
        go/pkg/mod/github.com/rzajac/[email protected]/suite/suite.go:148 +0x8b4
    testing.tRunner()
        /usr/local/go/src/testing/testing.go:1193 +0x202

  Goroutine 33 (running) created at:
    github.com/wagslane/go-rabbitmq.(*Publisher).startNotifyHandlers()
        go/pkg/mod/github.com/wagslane/[email protected]/publish.go:229 +0x164
    github.com/wagslane/go-rabbitmq.NewPublisher.func1()
        go/pkg/mod/github.com/wagslane/[email protected]/publish.go:160 +0x44

  Goroutine 24 (running) created at:
    testing.(*T).Run()
        /usr/local/go/src/testing/testing.go:1238 +0x5d7
    github.com/stretchr/testify/suite.runTests()
        go/pkg/mod/github.com/rzajac/[email protected]/suite/suite.go:203 +0xf7
    github.com/stretchr/testify/suite.Run()
        go/pkg/mod/github.com/rzajac/[email protected]/suite/suite.go:176 +0x944
    git.vonroll-infratec.com/go/meas/pkg/queue.TestRabbit()
        ws/go/meas/pkg/queue/rabbit_test.go:19 +0xa4
    testing.tRunner()
        /usr/local/go/src/testing/testing.go:1193 +0x202
  ==================

in my tests.

how to support rabbit mq consumer rebalance?

I found that mq is very long, then start more consumers don't take effect.

Then, I restart old consumers, these new consumers begin to consume messages in the very long queue.

Does rabbitmq support rebalance? And how to support rabbit mq consumer rebalance?

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.