Giter Club home page Giter Club logo

go-mq's Introduction

Build Status codecov Go Report Card Quality Gate Status GoDoc Mentioned in Awesome Go License

About

This package provides an ability to encapsulate creation and configuration of RabbitMQ([AMQP])(https://www.amqp.org) entities like queues, exchanges, producers and consumers in a declarative way with a single config.

Exchanges, queues and producers are going to be initialized in the background.

go-mq supports both sync and async producers.

go-mq has auto reconnects on closed connection or network error. You can configure delay between each connect try using reconnect_delay option.

Minimal go version

1.16

Install

go get -u github.com/cheshir/go-mq/v2

API

Visit godoc to get information about library API.

For those of us who preferred learn something new on practice there is working examples in example directory.

Configuration

You can configure mq using mq.Config struct directly or by filling it from config file.

Supported configuration tags:

  • yaml
  • json
  • mapstructure

Available options:

dsn: "amqp://login:password@host:port/virtual_host" # Use comma separated list for cluster connection
reconnect_delay: 5s                     # Interval between connection tries. Check https://golang.org/pkg/time/#ParseDuration for details.
test_mode: false                        # Switches library to use mocked broker. Defaults to false.
exchanges:
  - name: "exchange_name"
    type: "direct"
    options:
      # Available options with default values:
      auto_delete: false
      durable: false
      internal: false
      no_wait: false
queues:
  - name: "queue_name"
    exchange: "exchange_name"
    routing_key: "route"
    # A set of arguments for the binding.
    # The syntax and semantics of these arguments depend on the exchange class.
    binding_options:
      no_wait: false
    # Available options with default values:
    options:
      auto_delete: false
      durable: false
      exclusive: false
      no_wait: false
producers:
  - name: "producer_name"
    buffer_size: 10                      # Declare how many messages we can buffer during fat messages publishing.
    exchange: "exchange_name"
    routing_key: "route"
    sync: false                          # Specify whether producer will worked in sync or async mode.
    # Available options with default values:
    options:
      content_type:  "application/json"
      delivery_mode: 2                   # 1 - non persistent, 2 - persistent.
consumers:
  - name: "consumer_name"
    queue: "queue_name"
    workers: 1                           # Workers count. Defaults to 1.
    prefetch_count: 0                    # Prefetch message count per worker.
    prefetch_size: 0                     # Prefetch message size per worker.
    # Available options with default values:
    options:
      no_ack: false
      no_local: false
      no_wait: false
      exclusive: false

Error handling

All errors are accessible via exported channel:

package main

import (
	"log"

	"github.com/cheshir/go-mq"
)

func main() {
	config := mq.Config{} // Set your configuration.
	queue, _ := mq.New(config)
	// ...

	go handleMQErrors(queue.Error())
	
	// Other logic.
}

func handleMQErrors(errors <-chan error) {
	for err := range errors {
		log.Println(err)
	}
}

If channel is full – new errors will be dropped.

Errors from sync producer won't be accessible from error channel because they returned directly.

Tests

There are some cases that can only be tested with real broker and some cases that can only be tested with mocked broker.

If you are able to run tests with a real broker run them with:

go test -mock-broker=0

Otherwise mock will be used.

Changelog

Check releases page.

How to upgrade

From v1 to v2

  • New() returns *MessageQueue not the interface.

  • Minimal go version updated to the 1.16.

From version 0.x to 1.x

  • GetConsumer() method was renamed to Consumer(). This is done to follow go guideline.

  • GetProducer() method was removed. Use instead AsyncProducer() or SyncProducer() if you want to catch net error by yourself.

Epilogue

Feel free to create issues with bug reports or your wishes.

go-mq's People

Contributors

cheshir avatar dependabot[bot] avatar doranych avatar gustavosbarreto avatar zhangyangchen avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

go-mq's Issues

Read all messages in a queue

Is there an established method to grab all the messages in a certain queue? I don't want to reinvent the wheel, and I wasn't sure where else to ask. Thanks

Fix CI

Github actions started failing without changes in the code.

Priority

If I add information about the queue priority to the config file
"options": { "durable": true, "args": { "x-max-priority": 9 } }

In the console I get error
panic: interface conversion: interface {} is map[string]interface {}, not map[interface {}]interface {}

How will write priority in config file?

mq.channel.NotifyClose invalid

I called message.Ack(true) twice .
The program does not report an error
I monitored mq.channel.NotifyClose,but it not trigger

Extra arguments for a queue

How to pass extra arguments for a queue?

I tried the following configuration without success:

queues:
    - name: "uh.traffic.limit_workers"
      exchange: "uh.api.events"
      routing_key: "namespaces_activity"
      options:
        durable: true
        args:
          'x-max-priority': 9

Cluster connection

Suggestion

Create cluster connection
For backward compatibility it's possible to use list of cluster nodes in single string separated by comma

dsn:  amqp://login:password@host1:port/virtual_host,amqp://login:password@host2:port/virtual_host,amqp://login:password@host3:port/virtual_host

Same scheme is typically used to describe kafka cluster connection

Discuss

You always have rabbit cluster even if there's only one node in it. Right?
I think it's possible to iterate over cluster nodes before re-connection to the same node

Add sync producer

This kind of producer is useful to catch errors from producer immediately.

Test for reconnection are randomly breaking

STR
Run: go test -race -mock-broker=1

Expected result:

PASS
ok  	github.com/cheshir/go-mq	2.318s

Actual result:

--- FAIL: TestMq_Reconnect (0.32s)
	mq_test.go:294: Consumer did not read messages. Produced 2, read 0

Notes:
Test works only with mocked broker.
Can't reproduce bug with enabled debugger or with -run=Reconnect flag.

Enable Mocking/Testing by exporting brokerIsMocked var ?

Current Problem: I'd like to use the go-mq library within a service to connect to a MQ - to test that the service works fine, I want to use the a fake server.

As I was going through the code, it seems that the variable "brokerIsMocked" must be set to true, to allow connection to faked server. From another package I can't do this - obviously.

Am I missing something? Is there a another way to connect to a fake server? Like a flag with the mq.Config?

Kind regards !

PS: Nice work with the lib - really like it.

Unexpected consumer behavior

Consumers consume messages without explicit call of Consumer.Consume method.

i.e.
we have 2 binaries one with producer, another one with consumer, based on the same code and same config.

  1. run producer (without consuming)
  2. run consumer
  3. produce several messaged to amqp server
  4. consumer consumes several messages
  5. stop producer
  6. consumer gets messages from nowhere (ofc we produced them before, they just stacked in non existent consumer in producer binary)

In my case, Rabbit shows us 2 consumers.

Following this messages are not suppose to be consumed without real Consumer.Consume call

If you may guide where should i dig, i might be able to fix this

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.