Giter Club home page Giter Club logo

gookit / event Goto Github PK

View Code? Open in Web Editor NEW
483.0 18.0 57.0 144 KB

📢 Lightweight event manager and dispatcher implements by Go. Go实现的轻量级的事件管理、调度程序库, 支持设置监听器的优先级, 支持使用通配符来进行一组事件的监听

Home Page: https://pgk.go.dev/github.com/gookit/event

License: MIT License

Go 100.00%
event-management event-dispatcher event-listener gookit multiple-listeners eventbus events

event's People

Contributors

dependabot-preview[bot] avatar dependabot[bot] avatar inhere avatar kanpachi888 avatar lowitea avatar purevirtual avatar relicoftesla avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

event's Issues

使用自定义事件,并发会有问题

for {
		time.Sleep(1 * time.Second)
		go fire()
		go fire()
		go fire()
}
func fire() {
	//mu.Lock()
	defer func() {
		//mu.Unlock()
		if panicErr := recover(); panicErr != nil {
			fmt.Printf("xxxxx %s", panicErr)
			//logrus.Errorf("[Recovery from panic]", zap.Any("panicErr:", panicErr), zap.String("stack", string(debug.Stack())))
		}
	}()
	d := &listener.DisposeFile{
		DevName: "aaa",
		DevIp:   "xczxc",
	}
	e := &listener.MyEvent{DisposeFile: d}
	e.SetName("e3")
	event.AddEvent(e)
	event.Fire(e.Name(), nil)
}
type Manager struct {
	sync.Mutex
	// EnableLock enable lock on fire event.
	EnableLock bool
	// name of the manager
	name string
	// pool sync.Pool
	// is a sample for new BasicEvent
	sample *BasicEvent
	// storage user custom Event instance. you can pre-define some Event instances.
	events map[string]Event
	// storage all event name and ListenerQueue map
	listeners map[string]*ListenerQueue
	// storage all event names by listened
	listenedNames map[string]int
}

代码使用events map[string]Event 存放事件,存在并发写入map的问题

建议 Event Name 检查出错后返回 error,而不是直接 panic

目前库中,检查 Event Name 不符合规则后会直接 panic,逻辑隐蔽,处理粗暴,很容易被坑。

建议改成返回 error

相关代码见 util.go:

// goodName check event name is valid.
func goodName(name string, isReg bool) string {
	name = strings.TrimSpace(name)
	if name == "" {
		panic("event: the event name cannot be empty")
	}

	// on add listener
	if isReg {
		if name == AllNode || name == Wildcard {
			return Wildcard
		}
		if strings.HasPrefix(name, AllNode) {
			return name
		}
	}

	if !goodNameReg.MatchString(name) {
		panic(`event: name is invalid, must match regex:` + goodNameReg.String())
	}
	return name
}

How to register a subscriber ?

System (please complete the following information):

linux

Describe the bug

I don't understand how to register my subscriber.
I have follow the documentation but my listener registered in the subscriber are not called.
I suppose that i have to "init" the subscriber ? Or is it automatic ?

Maybe it's just that i don't understand how to use it.

Thanks in advance.

Channels on Fire func

The library is very good.

But I am interested in working under high load.

Have you thought about adding channel support to event.Fire()? That would be able to handle load peaks?

异步事件,不执行

event.Async("new.member", event.M{"memberId": member.ID, "superiorId": superiorId})

我这有 投递了一个异步事件, 经常发现 这个事件执行了几次 后面都没 执行了,
换成同步方法
event.Fire("new.member", event.M{"memberId": member.ID, "superiorId": superiorId})

fire 就都能成功

见图片,并发下数据会有问题

func (em *Manager) Fire(name string, params M) (err error, e Event) {
	name = goodName(name)

	// NOTICE: must check the '*' global listeners
	if false == em.HasListeners(name) && false == em.HasListeners(Wildcard) {
		// has group listeners. "app.*" "app.db.*"
		// eg: "app.db.run" will trigger listeners on the "app.db.*"
		pos := strings.LastIndexByte(name, '.')
		if pos < 0 || pos == len(name)-1 {
			return // not found listeners.
		}

		groupName := name[:pos+1] + Wildcard // "app.db.*"
		if false == em.HasListeners(groupName) {
			return // not found listeners.
		}
	}

	// call listeners use defined Event
	if e, ok := em.events[name]; ok {
		if params != nil {
			e.SetData(params)
		}

		err = em.FireEvent(e)
		return err, e
	}

	// create a basic event instance
	e = em.newBasicEvent(name, params)
	// call listeners handle event
	err = em.FireEvent(e)
	return
}

这个函数的如下逻辑

if e, ok := em.events[name]; ok {
		if params != nil {
			e.SetData(params)
		}

		err = em.FireEvent(e)
		return err, e
	}

在并发下会有数据互相覆盖的情况

支持rocketmq Kafka之类的驱动吗

System (please complete the following information):

  • OS: linux [e.g. linux, macOS]
  • GO Version: 1.13 [e.g. 1.13]
  • Pkg Version: 1.1.1 [e.g. 1.1.1]

Describe the bug

A clear and concise description of what the bug is.

To Reproduce

// go code

Expected behavior

A clear and concise description of what you expected to happen.

Screenshots

If applicable, add screenshots to help explain your problem.

Additional context

Add any other context about the problem here.

event.on("*", xxxx) not work

Event2FurcasTicketCreate              = "kapal.furcas.ticket.create"

event.On("*", &kapal.DefaultNotify)                           // not work
event.On("kapal.furcas.ticket*", &kapal.DefaultNotify         // not work
event.On(kapal.Event2FurcasTicketReset, &kapal.DefaultNotify) // work

异步多线程消费

System (please complete the following information):

  • OS: linux
  • GO Version: 1.20
  • Pkg Version: 1.1.1

Describe the bug

  1. 设置 o.ConsumerNum = 10 多个消费者,与预期不一致(感觉还是只有一个routine在执行)

To Reproduce

// preapre: 此时设置 ConsumerNum = 10, 每个任务耗时1s, 触发100个任务
// expected: 10s左右执行完所有任务
// actual: 执行了 100s左右
func TestChanEvent2(t *testing.T) {
	var em = event.NewManager("default", func(o *event.Options) {
		o.ConsumerNum = 10
	})
	defer em.CloseWait()

	var listener event.ListenerFunc = func(e event.Event) error {
		time.Sleep(1 * time.Second)
		return nil
	}

	em.On("app.evt1", listener, event.Normal)

	for i := 0; i < 100; i++ {
		em.FireAsync(event.New("app.evt1", event.M{"arg0": "val2"}))
	}

	fmt.Println("publish event finished!")
}

Screenshots

image

Additional context

  1. 是有什么特殊设置嘛(打开姿势不对? @RelicOfTesla @inhere 可以帮忙看一下嘛,谢谢~
  2. go 可以多个消费者并发读取 chan啊
    // 这个例子就是 10s 左右完成所有任务
    func TestConcurrentRead(t *testing.T) {
        var wg sync.WaitGroup
        defer wg.Wait()
        ch := make(chan int, 500)
    
        // 1. 启动多个 goroutine 读取数据
        routine(&wg, 10, ch)
    
        // 2. 向 chan 内写数据
        for i := 0; i < 100; i++ {
    	    ch <- i
        }
    
        // 3. 关闭 channel
        close(ch)
    }
    
    func routine(wg *sync.WaitGroup, count int, ch chan int) {
        for i := 0; i < count; i++ {
    	    wg.Add(1)
    	    go func(id int) {
    		    defer wg.Done()
    		    for {
    			    data, ok := <-ch
    			    if !ok {
    				    fmt.Printf("Goroutine %d: Channel closed\n", id)
    				    return
    			    }
    
    			    time.Sleep(1 * time.Second)
    			    fmt.Printf("Goroutine %d: Received %d\n", id, data)
    		    }
    	    }(i)
        }
    }

event.on("*", xxxx) not work

Event2FurcasTicketCreate              = "kapal.furcas.ticket.create"

event.On("*", &kapal.DefaultNotify)                           // not work
event.On("kapal.furcas.ticket*", &kapal.DefaultNotify)         // not work
event.On(kapal.Event2FurcasTicketReset, &kapal.DefaultNotify) // work

这是最小可复现代码

type test_notify struct {}

var is_Run = false

func (notify *test_notify) Handle(e event.Event) error {
	is_Run = true
	return nil
}

func TestTicketCreate_Handle_1(t *testing.T) {
	TestNotify := test_notify{}
	event.On("*", &TestNotify)
	err, _ := event.Fire("test_notify", event.M{})
	assert.Nil(t, err)
	assert.NotEqual(t, is_Run, true)

	event.On("test_notify", &TestNotify)
	err, _ = event.Fire("test_notify", event.M{})
	assert.Nil(t, err)
	assert.Equal(t, is_Run, true)
}

I'm here again,V1.1.1 has goroutine leak.please see this eg.

System (please complete the following information):

  • OS: linux
  • GO Version: 1.20
  • Pkg Version: 1.1.1

Describe the bug

goroutine leak!!!

To Reproduce

leak

func concurrentReq() {
	var cnt int32 = 0
	event.On("evt1", event.ListenerFunc(func(e event.Event) error {
		//ZapLog.Debug(fmt.Sprintf("handle event start: %s,data:%v", e.Name(), e.Data()))
		time.Sleep(10 * time.Second)
		//ZapLog.Debug(fmt.Sprintf("handle event end: %s,data:%v", e.Name(), e.Data()))
		atomic.AddInt32(&cnt, 1)
		return nil
	}), event.Normal)

	for i := 0; i < 10000; i++ {
		go func(num int) {
			err, _ := event.Fire("evt1", event.M{"task": num})
			if err != nil {
				ZapLog.Debug(fmt.Sprintf("num:%d,event err: %v", err, num))
			}
		}(i)
	}
	time.Sleep(20 * time.Second)

	ZapLog.Debug(fmt.Sprintf("handle event cnt: %d,goroutes:%d", cnt, runtime.NumGoroutine()))
}

the result is:2023-09-01T11:36:44.616+0800 DEBUG library/event.go:12 handle event cnt: 2,goroutes:9999

no leak

func concurrentReq() {
	var cnt int32 = 0
	event.On("evt1", event.ListenerFunc(func(e event.Event) error {
		//ZapLog.Debug(fmt.Sprintf("handle event start: %s,data:%v", e.Name(), e.Data()))
		//time.Sleep(10 * time.Second)
		//ZapLog.Debug(fmt.Sprintf("handle event end: %s,data:%v", e.Name(), e.Data()))
		atomic.AddInt32(&cnt, 1)
		return nil
	}), event.Normal)

	for i := 0; i < 10000; i++ {
		go func(num int) {
			err, _ := event.Fire("evt1", event.M{"task": num})
			if err != nil {
				ZapLog.Debug(fmt.Sprintf("num:%d,event err: %v", err, num))
			}
		}(i)
	}
	time.Sleep(20 * time.Second)

	ZapLog.Debug(fmt.Sprintf("handle event cnt: %d,goroutes:%d", cnt, runtime.NumGoroutine()))
}

the result is:2023-09-01T11:36:11.441+0800 DEBUG library/event.go:12 handle event cnt: 10000,goroutes:1

functionality Wildcard

Thanks a lot for the library. Let me ask a question.
Are there any plans to implement the functionality Wildcard event name ex. "eve.some.*.*" and "eve.some.*.run"?

AwaitFire leak, chan not close

func (em *Manager) AwaitFire(e Event) (err error) {
	ch := make(chan error)

	go func(e Event) {
		err := em.FireEvent(e)
		ch <- err
	}(e)

	err = <-ch
                      // MUST    close(ch)
	return
}

Any more examples on how to use some of the capabilities?

Hello,

I am developing a new type of database that will also allow for event triggers when things are added, modified, deleted, etc. as well as a number of other functionality as this will be a type of Graph database.

The use of triggers should substantially the capabilities of the database for the user and so far, your "event" repo seems like it is the best one that I have come across for this purpose.

It would be nice to see a number of different examples showing various capabilities of the library.

Do you have more examples that I could see?
Thanks

Possible to add Event Timer Triggers?

Hello,

I am liking your Event library more and more as I think that it really has good potential.

One question is as to how well it scales?

and another question is if it could be extended to allow for setting up of Timed Event Triggers that would fire:

  1. Fire on a particular Time or Date?
  2. Fire periodically as to say every 30 seconds, or minutes, etc. (maybe event shorter or longer time intervals)?
  3. Other ways that times might be used?

Then functions could be activated based upon timed events.

This would be a very useful addition.
Thanks

RemoveListener not support closure function

func makeFn(a int) event.ListenerFunc {
	return func(e event.Event) error {
		fmt.Println(a)
		return nil
	}
}
func main() {
	evBus := event.NewManager("")
	f1 := makeFn(11)
	evBus.On("evt1", f1)
	f2 := makeFn(22)
	evBus.On("evt1", f2)
	evBus.RemoveListener("evt1", f1) // DON'T REMOVE ALL !!!
	evBus.MustFire("evt1", event.M{"arg0": "val0", "arg1": "val1"})
}

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.