Giter Club home page Giter Club logo

pubsub's Introduction

logo

Actions Status

PubSub provides a simple helper library for doing publish and subscribe style asynchronous tasks in Go, usually in a web or micro service. PubSub allows you to write publishers and subscribers, fully typed, and swap out providers (Google Cloud PubSub, AWS SQS etc) as required.

PubSub also abstracts away the creation of the queues and their subscribers, so you shouldn't have to write any cloud specific code, but still gives you options to set concurrency, deadlines, error handling etc.

Middleware is also included, including logging, tracing and error handling!

Table of Contents

Example

Here's a basic example using Nats streaming server and a basic subscriber function that prints hello.

To publish messages, you can call Publish, you can publish a Protobuf or JSON serializable object (i.e, most Go objects).

Publish is Protobuf by default..

Publisher

pubsub.Publish(ctx, "topic-name", &User{Id: "usr_0001"})

to publish a JSON object

pubsub.PublishJSON(ctx, "topic-name", &User{Id: "usr_0001"})

This can be useful if the application subscribing isn't good with Protobuf or is external to your company for example. However Protobuf is recommended for speed, type safety and forwards compatability.

Subscriber

Subscribing to a topic is done with a single function, you'll receive a context, the object that was in the queue and the pubsub message, which includes some metadata and timing information, should you need it.

func PrintHello(ctx context.Context, msg *HelloMsg, m *pubsub.Msg) error {
	fmt.Printf("Message received %+v\n\n", m)

	fmt.Printf(msg.Greeting + " " + msg.Name + "\n")

	return nil
}

First though, you need to "Setup" your subscribers

type Subscriber struct{}

func (s *Subscriber) Setup(c *pubsub.Client) {
	c.On(pubsub.HandlerOptions{
		Topic:   HelloTopic,
		Name:    "print-hello",
		Handler: PrintHello,
		AutoAck: true,
		JSON:    true,
	})
}

pubsub.Subscribe(&Subscriber{})

Full Example

You can see a full example in the example folder.

Middleware

Default

PubSub provides a helper to setup the default middleware.

At the time of writing this includes, Logrus, Opentracing, Prometheus, Recovery (Handles panics) and Audit Logging

To use this, simple include it when initialising PubSub

pubsub.SetClient(&pubsub.Client{
	ServiceName: "my-service-name",
	Provider:    provider,
	Middleware:  defaults.Middleware,
})

You can optionaly provide a recovery handler too.

pubsub.SetClient(&pubsub.Client{
	ServiceName: "my-service-name",
	Provider:    provider,
	Middleware:  defaults.MiddlewareWithRecovery(func(p interface{}) (err error){
		// log p or report to an error reporter
	}),
})

Logrus

When enabled, the Logrus middleware will output something similar to below. Note that the level is DEBUG by default. To see the logs, you'll need to set logrus.SetLevel(logrus.DebugLevel) or use something like github.com/lileio/Logr which can set it from ENV variables.

time="2019-09-23T12:46:13Z" level=debug msg="Google Pubsub: Publishing"
time="2019-09-23T12:46:13Z" level=debug msg="Google Pubsub: Publish confirmed"
time="2019-09-23T12:46:13Z" level=debug msg="Published PubSub Msg" component=pubsub duration=143.545203ms metadata="map[x-b3-parentspanid:622cff2be9102141 x-b3-sampled:1 x-b3-flags:0 x-audit-user:[email protected] x-b3-traceid:4275176f2f7f729257887d1e4853498d x-b3-spanid:017e28147c6c3704]" topic=hello.world
time="2019-09-23T12:46:16Z" level=debug msg="Processed PubSub Msg" component=pubsub duration=1.702259988s handler=function_name id=734207593944188 metadata="map[x-b3-traceid:4275176f2f7f729257887d1e4853498d x-b3-spanid:017e28147c6c3704 x-audit-user:[email protected] x-b3-parentspanid:622cff2be9102141 x-b3-flags:0 x-b3-sampled:1]" topic=hello.work

Opentracing

The Opentracing middle adds tags ands logs to spans which will later be sent to something like Zipkin or Jaeger when setup in the application. Note that the Opentracing middleware only adds things to the context but isn't responsible for setting up Opentracing and it's reporting, for that, see here.

Prometheus

The Prometheus middleware includes some counters and histograms to help with monitoring, you can see there here but these include.

pubsub_message_published_total{topic,service}
pubsub_outgoing_bytes{topic,service}
pubsub_publish_durations_histogram_seconds
pubsub_server_handled_total{"topic", "service", "success"}
pubsub_incoming_bytes{"topic", "service"}
pubsub_subscribe_durations_histogram_seconds{"topic", "service"}

Here's an example query to get messages handled (by a subscriber) every minute, make sure your prometheus step is also 1m

sum(increase(pubsub_server_handled_total[1m])) by (topic, success)

Providers

Google Cloud PubSub

To setup a Google Cloud client, you can do the following..

pubsub.SetClient(&pubsub.Client{
	ServiceName: "my-service-name",
	Provider:    google.NewGoogleCloud('projectid'),
	Middleware:  defaults.Middleware,
})

If you're on Google Cloud vms you're environment likely already has credentials setup, but locally you can set them up with default credentials, if you're on Kubernetes, I reccomend setting up service account and then making a secret file and setting the GOOGLE_APPLICATION_CREDENTIALS to the filepath of that JSON secret key.

The Google PubSub provider is tested heavily in production by Echo and works well, we have however noticed some strange behaviour from Google subscribers, as they try to be clever and balance traffic and other strange things. For example, if you want to only process 2 messages at a time, and don't process the two you're given, then can often result in a pause before more messages are sent to you, this can be hard to debug as a queue builds up, but often fixes itself.

Nats Streaming Server

To setup a Nats Streaming client, you can do the following. Optionally passing options for the original client

pubsub.SetClient(&pubsub.Client{
	ServiceName: "my-service-name",
	Provider:    nats.NewNats('clustername', opts),
	Middleware:  defaults.Middleware,
})

Note this driver is for Nats Streaming, and not for plain Nats.

AWS SQS/SNS

Currently there is no provider for AWS SNS and SQS. Please feel free to make a pull request!

Kafka

There's an experimental provider for Kafka available here, but it's limiting in options you can override. I'd love to see someone take this on and help it become more bullet proof. But things like retries are hard.

pubsub's People

Contributors

arbarlow avatar haswalt avatar jesushernandez avatar jonbretman avatar mahboubii avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar

pubsub's Issues

How do you unsubscribe?

Can't see in the handler/subscriber or is it only possible to unsubscribe with the Shutdown() ?

Working example with Memory Provider

Could you consider adding a working example using the Memory Provider? Preferably one with includes the subscriber and how to set up with components to work together.

Logging Middleware

This should be easily configurable via options, to allow, info, debug, warn, error etc by default

NATS and liftbridge

There is a new golang project that is very similar to Kafka.
It uses NATS and it's called liftbridge.

I was wondering about integrating it with this and your grpc generator.
But does the grpc generator allow to gen the pub sub middleware ?

Potential shutdown subscriptions leaks

I notice the subscribe client when being closed/shutdown, it will loop a map[string]subscription and wait to close each subscription. How about if we subscribe to the same topic a couple of times in one app which could be possibly done through different modules? Then we will override the subscription in the map with the same topic and we won't gracefully shutdown.

type struct subscriptionEntry {
topic string
subscription *stan.Subscription
}

A slice []subscriptionEntryt might be good enough instead of a map.

How to do unsubscribe? I know in most cases, we do not need to.

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    ๐Ÿ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. ๐Ÿ“Š๐Ÿ“ˆ๐ŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google โค๏ธ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.