Giter Club home page Giter Club logo

eventbus's People

Contributors

asaskevich avatar bennah avatar briandowns avatar crijonsi avatar dnathe4th avatar dominikschulz avatar erfanio avatar erickskrauch avatar hagii avatar lookfirst avatar pgermishuys avatar tetratorus avatar xmxiaoq avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

eventbus's Issues

Feature request: subscription by mask

Something like

bus.SubscribeAsync("content:*", func() {}, true)

And it should react on "content:created" and "content:deleted", etc.

Also, for this case - will be cool if there will be "event" parameter, for subscriber.

PublishAsync

Hi,

Considering the callback func slowCalculator, the way PublishAsync works using Publish means that if multiple events are published asynchronously to the same topic, each subsequent goroutine has to wait for the earlier goroutine callback to finish before executing.

This may be ideal in some scenarios where there may be contention if the callbacks are run in two separate go routines (transactional behaviour).

However, this may not be ideal where there is no contention if a given callback runs more than once concurrently.

The idea tested for this scenario is to

  • lock
  • do all necessary the map operations
  • unlock
  • execute the handler

This will allow each subsequent goroutine callback to gain access to the lock almost instantly and allow the same callback to run multiple times concurrently.

As part of a slightly bigger change, I was thinking of a new struct to represent a handler that contains

  • the callback as reflect.Value
  • flagOnce
  • async bool, to determine whether the callback should run asynchronously
  • transactional bool, to determine whether to use the default behaviour or allow the new behaviour

Additionally:

  • PublishAsync will be unexported and Publish will be the general purpose method for publishing events.
  • SubscribeAsync will be implemented to set the behaviour for async callbacks

The downside to this approach is having 3 extra bools for each handler. The advantage is only one map will be needed.

I'm hoping for some feedback, what you think works, what doesn't or any suggestions related to this change.

unsubscribe and closures

Unsubscribe is broken with closures. This code shows the problem:

makeHandler := func(tag string) func(msg string) {
	return func(msg string) {
		fmt.Printf("%s %s\n", tag, msg)
	}
}

var bus EventBus.Bus = EventBus.New()

handler1 := makeHandler("handler1")
fmt.Printf("handler1 pointer %x\n", reflect.ValueOf(handler1).Pointer())
bus.Subscribe("foo", handler1)

handler2 := makeHandler("handler2")
fmt.Printf("handler2 pointer %x\n", reflect.ValueOf(handler2).Pointer())
bus.Subscribe("foo", handler2)

bus.Publish("foo", "A")
bus.Unsubscribe("foo", handler2)
bus.Publish("foo", "B")

Here's the output:

handler1 pointer 11ac100
handler2 pointer 11ac100
handler1 A
handler2 A
handler2 B

Note that even though we removed handler2, it still got an the B event.

What's happening is that EventBus uses reflect.ValueOf().Pointer() for Unsubscribe. However, the pointer to a function isn't enough to distinguish one closure from another. You can see the problem in the above output where it shows that the pointer values for handler1 and handler2 are the same.

Unsubscribe middle closure function

import (
	"github.com/asaskevich/EventBus"
	"github.com/stretchr/testify/require"
	"testing"
)

func TestAsaskevichBus_UnsubscribeClosureMiddle(t *testing.T) {

	callCount := 0
	sum := 0
	makeFn := func(a int) func() {
		return func() {
			callCount++
			sum += a
		}
	}
	const evName = "ev1"
	evBus := EventBus.New()
	f1 := makeFn(11)
	f2 := makeFn(22)
	f3 := makeFn(33)
	require.NoError(t, evBus.Subscribe(evName, f1))
	require.NoError(t, evBus.Subscribe(evName, f2))
	require.NoError(t, evBus.Subscribe(evName, f3))
	//
	require.NoError(t, evBus.Unsubscribe(evName, f2))
	//
	evBus.Publish(evName)
	require.Equal(t, callCount, 2)
	require.Equal(t, sum, 11+33)

}

like #47

`Lock` aquired when publish an event from subscriber.

Current behaviour

The main function Publish an event1 and relative handler of event1, publish event2, but the event2 handler is not calling, and the program is not exited.

bus.Subscribe("event2", func() {
	fmt.Println("event2")
})
bus.Subscribe("event1", func(bus EventBus.Bus) {
	fmt.Println("event1")
	time.Sleep(time.Second * 5)
	bus.Publish("event2")
})
bus.Publish("event1", bus)

Further debugging

  • I had seen in subscribe we lock bus, so whenever publish already locked, and in SubscribeAsync we can't lock bus.
    Is that default behaviour?

Basic Server Client Example not Working.

Server main.go

`package main

import (
evbus "github.com/asaskevich/EventBus"
)

func main() {
server := evbus.NewServer(":8090", "/server_bus", evbus.New())
server.Start()
// ...
server.EventBus().Publish("main:calculator", 4, 6)
}`

Client main.go

`
package main

import (
"fmt"
evbus "github.com/asaskevich/EventBus"
)

func calculator(a int, b int) {
fmt.Printf("%d\n", a+b)
}

func main() {
client := evbus.NewClient(":8089", "/_server_bus", evbus.New())
client.Start()
client.Subscribe("main:calculator", calculator, ":8090", "/server_bus")
}
`
After running the server code I try to run the client and I get this in the console ->
Server not found - runtime error: invalid memory address or nil pointer dereference

Can someone please tell me what I am doing wrong?

Race conditions in async transactional handlers

I detected race conditions in SubscribeAsync with transactional flag. It is due to the fact that we lock the handler after and not before spawning the async handler goroutine. On a fast machine it causes unpredictable behavior. Preparing a PR.

Potential deadlock due to calling callbacks while holding a lock

See golang mutexes aren't recursive / re-entrant

Bus.Publish(...) method locks mutex and calls callbacks under lock, so any access to the bus in callback will cause deadlock.

The following example reproduces issue:

package main

import (
	"fmt"

	"github.com/asaskevich/EventBus"
)

var bus EventBus.Bus

func showbug(a int, b int) {
	fmt.Printf("%d\n", a+b)

	if a == 20 {
		bus.Publish("main:calculator", a+1, b)
	}
}

func main() {
	bus = EventBus.New()
	bus.Subscribe("main:calculator", showbug)
	bus.Publish("main:calculator", 20, 40)
	bus.Unsubscribe("main:calculator", showbug)
}

We use another implementation of eventbus (inspired by yours) where this issue was fixed: github.com/ispringteam/goeventbus (see copySubscriptions method and nextID field)

Feel free to adapt our solution or introduce another one ;)

Stuck when trying to send response back to message Bus

Hi,
I have modified the example and wanted to send the response of the calculator back on the bus, on a different topic. I would like to chain different functions to react on events down a chain. When I run "go run" process is stuck, nothing happens.

package main
import (
    "fmt"
    "github.com/asaskevich/EventBus"
    "time"
)

var (
    bus  EventBus.Bus;
)

func calculator1(a int, b int) {
    fmt.Printf("calc1: %d\n", a + b)
    bus.Publish("print", "calc1 calculated : %d\n", a + b)
}

func printer(s string) {
	fmt.Println(s)
}

func main() {
	bus = EventBus.New();
    bus.Subscribe("calc", calculator1);
	bus.Subscribe("print", printer);
	sum := 1
    for sum < 10 {
        fmt.Println(sum)
        bus.Publish("calc", sum, sum);
        time.Sleep(1000 * time.Millisecond)
        sum += 1
    }
    bus.Unsubscribe("calc", calculator1);
    bus.Unsubscribe("print", printer);
}

Output

> go run poc3.go
1
calc1: 2
^Csignal: interrupt

I am on linux 64 bit.

Two subscribers?

As I see sources -

func (bus *EventBus) Subscribe(topic string, fn interface{}) error {
    ...
    bus.handlers[topic] = &eventHandler{
        v, false, false, false, sync.Mutex{},
    }
    ...
}

It bus support only one subscriber for each topic?
If I add second subscriber it will overwrite first.

There is no release version available

Hello, Thank you for this beautifully simple and functional library. Can you please release a version so that we can use it instead of master? This helps in shielding our projects from changes in master branch.

SubscribeAsync is not stable currently

var (
bus = EventBus.New()
)

// Pub publishes the given interface to any listeners for that interface.
func Pub(topic string, data ...interface{}) {
bus.Publish(topic, data...)
}

// Sub subscribes to specific interfaces with the specified callback
// function.
func Sub(topic string, fn interface{}) error {
//return bus.SubscribeAsync(topic, fn, true)
return bus.Subscribe(topic,fn);
}

func Sub2(topic string, fn interface{}) error {
return bus.SubscribeAsync(topic, fn, true)
//return bus.Subscribe(topic,fn);
}

the Sub2 to subsrible event works,but when sometimes miss error,it will fault forever.

Improve testing

Most of the tests have several fail conditions that are unrecognizable (tests do not log which of the conditions failed). I suggest we use some kind of assertion lib (testify is a good pick) to improve readability.

Use of reflect

There are multiple places where reflect is being used. Wouldnt that cause performance issues?

Indexing error due to multiple removeHandler

In line 141 func Publish

// Publish executes callback defined for a topic. Any additional argument will be transferred to the callback.
func (bus *EventBus) Publish(topic string, args ...interface{}) {
	bus.lock.Lock() // will unlock if handler is not found or always after setUpPublish
	defer bus.lock.Unlock()
	if handlers, ok := bus.handlers[topic]; ok && 0 < len(handlers) {
		// Handlers slice may be changed by removeHandler and Unsubscribe during iteration,
		// so make a copy and iterate the copied slice.
		copyHandlers := make([]*eventHandler, len(handlers))
		copy(copyHandlers, handlers)
		for i, handler := range copyHandlers {
			if handler.flagOnce {
				bus.removeHandler(topic, i) // multiple operation causes indexing error
			}
			if !handler.async {
				bus.doPublish(handler, topic, args...)
			} else {
				bus.wg.Add(1)
				if handler.transactional {
					bus.lock.Unlock()
					handler.Lock()
					bus.lock.Lock()
				}
				go bus.doPublishAsync(handler, topic, args...)
			}
		}
	}
}

i and handler are ranged in copyHandlers while remove operation actions in bus.handlers. ๐Ÿคจ

Code crash when publishing in a case of server/client eventBus

Hi!
My issue is the following:
I'm currently implementing a connection between 2 different code into the same computer. So I have a server and a client.
I run the server and then the client. Then the server publish events which are read and execute by the client.
But when I stop the clients code the server code is crashing.

After test and investigations I found that the subscribers map[string][]*SubscribeArg map from the server object is not updated when the client is disconnected. If the client is not working any more the subscribers object is always the same.

So when I'm using server.HasClientSubscribed(&theArgs) its responding always true event after the disconnection of the client.

My workaround is to use:
rpcClient, _ := rpc.DialHTTPPath("tcp", ":2010", "/_client_bus_")
and check if it not nil before to make a publish

Is there a solution in order to update this map? Is there any another solution in order to avoid the crash of the code?

Best regards !

Problems/limitations with network busses

I want to bridge two busses over the network, giving the illusion of a single bus. This isn't currently possible as it deadlocks (and also you'll end up with an infinite loop of duplicate events).

The simple program below will show the deadlock:

package main

import (
	"fmt"
	"time"

	"github.com/asaskevich/EventBus"
)

func main() {

	networkBusA := EventBus.NewNetworkBus(":2035", "/_net_bus_A")
	networkBusA.Start()

	networkBusB := EventBus.NewNetworkBus(":2030", "/_net_bus_B")
	networkBusB.Start()

	networkBusA.Subscribe("topic-A", func(a int) { fmt.Println("A handler:", a) }, ":2030", "/_net_bus_B")

	networkBusB.Subscribe("topic-A", func(a int) { fmt.Println("B handler:", a) }, ":2035", "/_net_bus_A")

	fmt.Println("Publishing on A...")
	networkBusA.EventBus().Publish("topic-A", 10)
	fmt.Println("Done.")

	time.Sleep(2 * time.Second)

	networkBusA.Stop()
	networkBusB.Stop()
}

This is similar to #25, with a slightly different cause.

  • networkBusA.EventBus().Publish("topic-A", 20) causes the networkBusA lock to be taken
  • The rpc callback for networkBusB is called to publish the event there.
  • This goes through networkBusB's normal Publish() path
    • Which will call the rpc callback for networkBusA, and now we're back trying to take the networkBusA lock again.

Avoiding the deadlock by spawning the Publish() in a goroutine shows the infinite loop (run the same program as above):

diff --git a/client.go b/client.go
index a9e9e69..831431a 100644
--- a/client.go
+++ b/client.go
@@ -116,7 +116,7 @@ type ClientService struct {
 
 // PushEvent - exported service to listening to remote events
 func (service *ClientService) PushEvent(arg *ClientArg, reply *bool) error {
-	service.client.eventBus.Publish(arg.Topic, arg.Args...)
+	go service.client.eventBus.Publish(arg.Topic, arg.Args...)
 	*reply = true
 	return nil
 }

I'm looking for any suggestions on how to fix this, or perhaps just a definition of how network busses should work (e.g. if there's multiple network clients subscribed to a bus, should events received over the network be propagated to them?)

Just avoiding sending an event back to the client that published it seems like OK behaviour, but I'm not entirely sure how to plumb that in - Publish() probably needs to gain an understanding of where a publish request came from, and handlers need to know where they are going to

Introduce bus interface instead of struct

Hello,

Could we replace structs with interfaces to facilitate testing of bus-dependent services? This would also provide an elegant way to implement the network bus: it would simply be an another implementation of the interface. I'm working on a corresponding PR.
It would be useful to tag releases as well as more and more people use dependency management tools.

Cheers,
Michal

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    ๐Ÿ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. ๐Ÿ“Š๐Ÿ“ˆ๐ŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google โค๏ธ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.