asaskevich / eventbus Goto Github PK
View Code? Open in Web Editor NEW[Go] Lightweight eventbus with async compatibility for Go
License: MIT License
[Go] Lightweight eventbus with async compatibility for Go
License: MIT License
Something like
bus.SubscribeAsync("content:*", func() {}, true)
And it should react on "content:created" and "content:deleted", etc.
Also, for this case - will be cool if there will be "event" parameter, for subscriber.
Hi,
Considering the callback func slowCalculator, the way PublishAsync works using Publish means that if multiple events are published asynchronously to the same topic, each subsequent goroutine has to wait for the earlier goroutine callback to finish before executing.
This may be ideal in some scenarios where there may be contention if the callbacks are run in two separate go routines (transactional behaviour).
However, this may not be ideal where there is no contention if a given callback runs more than once concurrently.
The idea tested for this scenario is to
This will allow each subsequent goroutine callback to gain access to the lock almost instantly and allow the same callback to run multiple times concurrently.
As part of a slightly bigger change, I was thinking of a new struct to represent a handler that contains
Additionally:
The downside to this approach is having 3 extra bools for each handler. The advantage is only one map will be needed.
I'm hoping for some feedback, what you think works, what doesn't or any suggestions related to this change.
Unsubscribe is broken with closures. This code shows the problem:
makeHandler := func(tag string) func(msg string) {
return func(msg string) {
fmt.Printf("%s %s\n", tag, msg)
}
}
var bus EventBus.Bus = EventBus.New()
handler1 := makeHandler("handler1")
fmt.Printf("handler1 pointer %x\n", reflect.ValueOf(handler1).Pointer())
bus.Subscribe("foo", handler1)
handler2 := makeHandler("handler2")
fmt.Printf("handler2 pointer %x\n", reflect.ValueOf(handler2).Pointer())
bus.Subscribe("foo", handler2)
bus.Publish("foo", "A")
bus.Unsubscribe("foo", handler2)
bus.Publish("foo", "B")
Here's the output:
handler1 pointer 11ac100
handler2 pointer 11ac100
handler1 A
handler2 A
handler2 B
Note that even though we removed handler2
, it still got an the B
event.
What's happening is that EventBus uses reflect.ValueOf().Pointer()
for Unsubscribe
. However, the pointer to a function isn't enough to distinguish one closure from another. You can see the problem in the above output where it shows that the pointer values for handler1 and handler2 are the same.
import (
"github.com/asaskevich/EventBus"
"github.com/stretchr/testify/require"
"testing"
)
func TestAsaskevichBus_UnsubscribeClosureMiddle(t *testing.T) {
callCount := 0
sum := 0
makeFn := func(a int) func() {
return func() {
callCount++
sum += a
}
}
const evName = "ev1"
evBus := EventBus.New()
f1 := makeFn(11)
f2 := makeFn(22)
f3 := makeFn(33)
require.NoError(t, evBus.Subscribe(evName, f1))
require.NoError(t, evBus.Subscribe(evName, f2))
require.NoError(t, evBus.Subscribe(evName, f3))
//
require.NoError(t, evBus.Unsubscribe(evName, f2))
//
evBus.Publish(evName)
require.Equal(t, callCount, 2)
require.Equal(t, sum, 11+33)
}
like #47
The main
function Publish an event1
and relative handler of event1
, publish event2
, but the event2
handler is not calling, and the program is not exited.
bus.Subscribe("event2", func() {
fmt.Println("event2")
})
bus.Subscribe("event1", func(bus EventBus.Bus) {
fmt.Println("event1")
time.Sleep(time.Second * 5)
bus.Publish("event2")
})
bus.Publish("event1", bus)
subscribe
we lock bus
, so whenever publish already locked, and in SubscribeAsync
we can't lock bus.I try but fail.
Is it possible to publish events in one golang program and subscribe to them from another?
I just had a look in client.go and found that there was commit dda45d4 to handle server start errors.
The same kind of fix applies on server.go too:
If listen() fails, there is no need to start
go http.Serve(l, nil)
Server main.go
`package main
import (
evbus "github.com/asaskevich/EventBus"
)
func main() {
server := evbus.NewServer(":8090", "/server_bus", evbus.New())
server.Start()
// ...
server.EventBus().Publish("main:calculator", 4, 6)
}`
Client main.go
`
package main
import (
"fmt"
evbus "github.com/asaskevich/EventBus"
)
func calculator(a int, b int) {
fmt.Printf("%d\n", a+b)
}
func main() {
client := evbus.NewClient(":8089", "/_server_bus", evbus.New())
client.Start()
client.Subscribe("main:calculator", calculator, ":8090", "/server_bus")
}
`
After running the server code I try to run the client and I get this in the console ->
Server not found - runtime error: invalid memory address or nil pointer dereference
Can someone please tell me what I am doing wrong?
I detected race conditions in SubscribeAsync with transactional flag. It is due to the fact that we lock the handler after and not before spawning the async handler goroutine. On a fast machine it causes unpredictable behavior. Preparing a PR.
See golang mutexes aren't recursive / re-entrant
Bus.Publish(...)
method locks mutex and calls callbacks under lock, so any access to the bus in callback will cause deadlock.
The following example reproduces issue:
package main
import (
"fmt"
"github.com/asaskevich/EventBus"
)
var bus EventBus.Bus
func showbug(a int, b int) {
fmt.Printf("%d\n", a+b)
if a == 20 {
bus.Publish("main:calculator", a+1, b)
}
}
func main() {
bus = EventBus.New()
bus.Subscribe("main:calculator", showbug)
bus.Publish("main:calculator", 20, 40)
bus.Unsubscribe("main:calculator", showbug)
}
We use another implementation of eventbus (inspired by yours) where this issue was fixed: github.com/ispringteam/goeventbus (see copySubscriptions method and nextID field)
Feel free to adapt our solution or introduce another one ;)
Hi,
I have modified the example and wanted to send the response of the calculator back on the bus, on a different topic. I would like to chain different functions to react on events down a chain. When I run "go run" process is stuck, nothing happens.
package main
import (
"fmt"
"github.com/asaskevich/EventBus"
"time"
)
var (
bus EventBus.Bus;
)
func calculator1(a int, b int) {
fmt.Printf("calc1: %d\n", a + b)
bus.Publish("print", "calc1 calculated : %d\n", a + b)
}
func printer(s string) {
fmt.Println(s)
}
func main() {
bus = EventBus.New();
bus.Subscribe("calc", calculator1);
bus.Subscribe("print", printer);
sum := 1
for sum < 10 {
fmt.Println(sum)
bus.Publish("calc", sum, sum);
time.Sleep(1000 * time.Millisecond)
sum += 1
}
bus.Unsubscribe("calc", calculator1);
bus.Unsubscribe("print", printer);
}
Output
> go run poc3.go
1
calc1: 2
^Csignal: interrupt
I am on linux 64 bit.
As I see sources -
func (bus *EventBus) Subscribe(topic string, fn interface{}) error {
...
bus.handlers[topic] = &eventHandler{
v, false, false, false, sync.Mutex{},
}
...
}
It bus support only one subscriber for each topic?
If I add second subscriber it will overwrite first.
Hello, Thank you for this beautifully simple and functional library. Can you please release a version so that we can use it instead of master? This helps in shielding our projects from changes in master branch.
var (
bus = EventBus.New()
)
// Pub publishes the given interface to any listeners for that interface.
func Pub(topic string, data ...interface{}) {
bus.Publish(topic, data...)
}
// Sub subscribes to specific interfaces with the specified callback
// function.
func Sub(topic string, fn interface{}) error {
//return bus.SubscribeAsync(topic, fn, true)
return bus.Subscribe(topic,fn);
}
func Sub2(topic string, fn interface{}) error {
return bus.SubscribeAsync(topic, fn, true)
//return bus.Subscribe(topic,fn);
}
the Sub2 to subsrible event works,but when sometimes miss error,it will fault forever.
Most of the tests have several fail conditions that are unrecognizable (tests do not log which of the conditions failed). I suggest we use some kind of assertion lib (testify is a good pick) to improve readability.
There are multiple places where reflect is being used. Wouldnt that cause performance issues?
In line 141 func Publish
// Publish executes callback defined for a topic. Any additional argument will be transferred to the callback.
func (bus *EventBus) Publish(topic string, args ...interface{}) {
bus.lock.Lock() // will unlock if handler is not found or always after setUpPublish
defer bus.lock.Unlock()
if handlers, ok := bus.handlers[topic]; ok && 0 < len(handlers) {
// Handlers slice may be changed by removeHandler and Unsubscribe during iteration,
// so make a copy and iterate the copied slice.
copyHandlers := make([]*eventHandler, len(handlers))
copy(copyHandlers, handlers)
for i, handler := range copyHandlers {
if handler.flagOnce {
bus.removeHandler(topic, i) // multiple operation causes indexing error
}
if !handler.async {
bus.doPublish(handler, topic, args...)
} else {
bus.wg.Add(1)
if handler.transactional {
bus.lock.Unlock()
handler.Lock()
bus.lock.Lock()
}
go bus.doPublishAsync(handler, topic, args...)
}
}
}
}
i
and handler
are ranged in copyHandlers
while remove operation actions in bus.handlers
. ๐คจ
Hi!
My issue is the following:
I'm currently implementing a connection between 2 different code into the same computer. So I have a server and a client.
I run the server and then the client. Then the server publish events which are read and execute by the client.
But when I stop the clients code the server code is crashing.
After test and investigations I found that the subscribers map[string][]*SubscribeArg
map from the server
object is not updated when the client is disconnected. If the client is not working any more the subscribers
object is always the same.
So when I'm using server.HasClientSubscribed(&theArgs)
its responding always true event after the disconnection of the client.
My workaround is to use:
rpcClient, _ := rpc.DialHTTPPath("tcp", ":2010", "/_client_bus_")
and check if it not nil
before to make a publish
Is there a solution in order to update this map? Is there any another solution in order to avoid the crash of the code?
Best regards !
I want to bridge two busses over the network, giving the illusion of a single bus. This isn't currently possible as it deadlocks (and also you'll end up with an infinite loop of duplicate events).
The simple program below will show the deadlock:
package main
import (
"fmt"
"time"
"github.com/asaskevich/EventBus"
)
func main() {
networkBusA := EventBus.NewNetworkBus(":2035", "/_net_bus_A")
networkBusA.Start()
networkBusB := EventBus.NewNetworkBus(":2030", "/_net_bus_B")
networkBusB.Start()
networkBusA.Subscribe("topic-A", func(a int) { fmt.Println("A handler:", a) }, ":2030", "/_net_bus_B")
networkBusB.Subscribe("topic-A", func(a int) { fmt.Println("B handler:", a) }, ":2035", "/_net_bus_A")
fmt.Println("Publishing on A...")
networkBusA.EventBus().Publish("topic-A", 10)
fmt.Println("Done.")
time.Sleep(2 * time.Second)
networkBusA.Stop()
networkBusB.Stop()
}
This is similar to #25, with a slightly different cause.
networkBusA.EventBus().Publish("topic-A", 20)
causes the networkBusA lock to be takenPublish()
path
Avoiding the deadlock by spawning the Publish() in a goroutine shows the infinite loop (run the same program as above):
diff --git a/client.go b/client.go
index a9e9e69..831431a 100644
--- a/client.go
+++ b/client.go
@@ -116,7 +116,7 @@ type ClientService struct {
// PushEvent - exported service to listening to remote events
func (service *ClientService) PushEvent(arg *ClientArg, reply *bool) error {
- service.client.eventBus.Publish(arg.Topic, arg.Args...)
+ go service.client.eventBus.Publish(arg.Topic, arg.Args...)
*reply = true
return nil
}
I'm looking for any suggestions on how to fix this, or perhaps just a definition of how network busses should work (e.g. if there's multiple network clients subscribed to a bus, should events received over the network be propagated to them?)
Just avoiding sending an event back to the client that published it seems like OK behaviour, but I'm not entirely sure how to plumb that in - Publish() probably needs to gain an understanding of where a publish request came from, and handlers need to know where they are going to
Hello,
Could we replace structs with interfaces to facilitate testing of bus-dependent services? This would also provide an elegant way to implement the network bus: it would simply be an another implementation of the interface. I'm working on a corresponding PR.
It would be useful to tag releases as well as more and more people use dependency management tools.
Cheers,
Michal
A declarative, efficient, and flexible JavaScript library for building user interfaces.
๐ Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
An Open Source Machine Learning Framework for Everyone
The Web framework for perfectionists with deadlines.
A PHP framework for web artisans
Bring data to life with SVG, Canvas and HTML. ๐๐๐
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
Some thing interesting about web. New door for the world.
A server is a program made to process requests and deliver data to clients.
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
Some thing interesting about visualization, use data art
Some thing interesting about game, make everyone happy.
We are working to build community through open source technology. NB: members must have two-factor auth.
Open source projects and samples from Microsoft.
Google โค๏ธ Open Source for everyone.
Alibaba Open Source for everyone
Data-Driven Documents codes.
China tencent open source team.