Giter Club home page Giter Club logo

timingwheel's Introduction

timingwheel

Golang implementation of Hierarchical Timing Wheels.

Installation

$ go get -u github.com/RussellLuo/timingwheel

Design

timingwheel is ported from Kafka's purgatory, which is designed based on Hierarchical Timing Wheels.

中文博客:层级时间轮的 Golang 实现

Documentation

For usage and examples see the Godoc.

Benchmark

$ go test -bench=. -benchmem
goos: darwin
goarch: amd64
pkg: github.com/RussellLuo/timingwheel
BenchmarkTimingWheel_StartStop/N-1m-8            5000000               329 ns/op              83 B/op          2 allocs/op
BenchmarkTimingWheel_StartStop/N-5m-8            5000000               363 ns/op              95 B/op          2 allocs/op
BenchmarkTimingWheel_StartStop/N-10m-8           5000000               440 ns/op              37 B/op          1 allocs/op
BenchmarkStandardTimer_StartStop/N-1m-8         10000000               199 ns/op              64 B/op          1 allocs/op
BenchmarkStandardTimer_StartStop/N-5m-8          2000000               644 ns/op              64 B/op          1 allocs/op
BenchmarkStandardTimer_StartStop/N-10m-8          500000              2434 ns/op              64 B/op          1 allocs/op
PASS
ok      github.com/RussellLuo/timingwheel       116.977s

License

MIT

timingwheel's People

Contributors

allenxuxu avatar russellluo avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

timingwheel's Issues

时间复杂度问题

虽然 DelayQueue 中 offer(添加)和 poll(获取并删除)操作的时间复杂度为 O(log n),但是相比定时任务的个数而言,bucket 的个数其实是非常小的(也就是 O(log n) 中的 n 很小),因此性能也是没有问题的。

看到作者博客中提到delayQueue中的元素个数应该只是跟bucket个数n有关,但是看到如下代码,在添加一个timer时,只要expiration不同,就会被添加到delayQueue中,所以这里会比较疑惑,这个时间复杂度是与“不同expiration的timer个数”有关?

if b.SetExpiration(virtualID * tw.tick) { tw.queue.Offer(b, b.Expiration()) }

定时器提前执行了

代码如下:

func (m *Manager) addTimer(t *task.Task) (timer *timingwheel.Timer, err error) {
	taskConfig, ok := m.taskConfigs[t.Name]
	if !ok {
		return nil, errors.Errorf("taskConfig %s not found", t.Name)
	}
	fn, err := taskConfig.GeneratorFunc(t)
	if err != nil {
		return nil, err
	}

	diff, err := util.CalcDiffWithNow(t.ExecDateTime)
	if err != nil {
		return nil, err
	}
	if diff < 0 {
		log.Warnf("timer diff is less the zero")
		return nil, nil
	}
	// 计算当前时间差(单位是秒)
	timer = m.tw.AfterFunc(time.Duration(diff)*time.Second, fn)
	log.Infof("add timer successfully, diff seconds:%d, task id:%s, object id:%s ", diff, t.ID)
	return timer, nil
}

我打印的日志:

add timer successfully, diff seconds:703726, task id:ObjectID("5e8    1fba9b8d1fca6422ec4c2")

也就是说会在703726秒后执行,大概是8天左右,但是这个任务第二天就执行了。。而且这是偶发的问题,是因为我这个时间间隔过大导致的吗

timewheel启动了很久,新插入一个定时器直接就过期

currentTime 只有在advanceClock中更新。假如 timewheel开了很久了,bucket 中没有定时器,新插入一个定时器,addOrRun读到currentTime 是不是直接就过期了?

func (tw *TimingWheel) advanceClock(expiration int64) {
	currentTime := atomic.LoadInt64(&tw.currentTime)
	if expiration >= currentTime+tw.tick {
		currentTime = truncate(expiration, tw.tick)
		atomic.StoreInt64(&tw.currentTime, currentTime)

		// Try to advance the clock of the overflow wheel if present
		overflowWheel := atomic.LoadPointer(&tw.overflowWheel)
		if overflowWheel != nil {
			(*TimingWheel)(overflowWheel).advanceClock(currentTime)
		}
	}
}

@RussellLuo

缺少 LICENSE 文件

问题:
由于缺少 LICENSE 文件,导致 pkg.go.dev 无法确定使用的 LICENSE,结果无法在线查看 GoDoc

data race error

OS:
macOS 10.14.6

golang: 1.12.9

testing code:

package main

import (
	"fmt"
	"math"
	"time"

	"github.com/RussellLuo/timingwheel"
)

func main() {
	timeout := 5 * time.Second

	tw := timingwheel.NewTimingWheel(time.Second, 20)
	tw.Start()
	defer tw.Stop()
	loop(tw, timeout)
	time.Sleep(3 * time.Second)
}

func loop(tw *timingwheel.TimingWheel, timeout time.Duration) error {
	total := int(math.RoundToEven(timeout.Seconds()))
	running := make(chan time.Time, 1)
	for i := 0; i < total; i++ {
		tw.AfterFunc(time.Duration(i)*time.Second, func() {
			running <- time.Now()
		})
	}
	timeoutExceeded := make(chan time.Time, 1)
	tw.AfterFunc(timeout, func() {
		// fmt.Println("The timer fires")
		timeoutExceeded <- time.Now()
	})

	count := 0
	for {
		select {
		case <-timeoutExceeded:
			fmt.Println(" --------------- timeout ")
			// break;
			return nil
		case <-running:
			fmt.Println("------------------ runing ", time.Now().String())
			count++
			if count == 6 {
				return nil
			}
		}
	}
}

result:

/Users/qinshen/go/src/vk/test-example/timingwheel   go version 
go version go1.12.9 darwin/amd64


 /Users/qinshen/go/src/vk/test-example/timingwheel   go run -race .
------------------ runing  2019-08-19 09:23:25.164081 +0800 CST m=+0.000580086
==================
WARNING: DATA RACE
Write at 0x00c00000e458 by goroutine 6:
  github.com/RussellLuo/timingwheel/delayqueue.(*priorityQueue).Swap()
      /Users/qinshen/go/pkg/mod/github.com/!russell!luo/[email protected]/delayqueue/delayqueue.go:37 +0x165
  container/heap.Remove()
      /usr/local/go/src/container/heap/heap.go:72 +0xc2
  github.com/RussellLuo/timingwheel/delayqueue.(*priorityQueue).PeekAndShift()
      /Users/qinshen/go/pkg/mod/github.com/!russell!luo/[email protected]/delayqueue/delayqueue.go:78 +0x113
  github.com/RussellLuo/timingwheel/delayqueue.(*DelayQueue).Poll()
      /Users/qinshen/go/pkg/mod/github.com/!russell!luo/[email protected]/delayqueue/delayqueue.go:131 +0x96
  github.com/RussellLuo/timingwheel.(*TimingWheel).Start.func1()
      /Users/qinshen/go/pkg/mod/github.com/!russell!luo/[email protected]/timingwheel.go:137 +0x93
  github.com/RussellLuo/timingwheel.(*waitGroupWrapper).Wrap.func1()
      /Users/qinshen/go/pkg/mod/github.com/!russell!luo/[email protected]/utils.go:29 +0x34

Previous read at 0x00c00000e458 by main goroutine:
  github.com/RussellLuo/timingwheel/delayqueue.(*DelayQueue).Offer()
      /Users/qinshen/go/pkg/mod/github.com/!russell!luo/[email protected]/delayqueue/delayqueue.go:116 +0x13f
  github.com/RussellLuo/timingwheel.(*TimingWheel).add()
      /Users/qinshen/go/pkg/mod/github.com/!russell!luo/[email protected]/timingwheel.go:86 +0x324
  github.com/RussellLuo/timingwheel.(*TimingWheel).addOrRun()
      /Users/qinshen/go/pkg/mod/github.com/!russell!luo/[email protected]/timingwheel.go:113 +0x42
  github.com/RussellLuo/timingwheel.(*TimingWheel).AfterFunc()
      /Users/qinshen/go/pkg/mod/github.com/!russell!luo/[email protected]/timingwheel.go:173 +0x1ae
  main.loop()
      /Users/qinshen/go/src/vk/test-example/timingwheel/main.go:30 +0x271
  main.main()
      /Users/qinshen/go/src/vk/test-example/timingwheel/main.go:17 +0x9a

Goroutine 6 (running) created at:
  github.com/RussellLuo/timingwheel.(*waitGroupWrapper).Wrap()
      /Users/qinshen/go/pkg/mod/github.com/!russell!luo/[email protected]/utils.go:28 +0x6f
  github.com/RussellLuo/timingwheel.(*TimingWheel).Start()
      /Users/qinshen/go/pkg/mod/github.com/!russell!luo/[email protected]/timingwheel.go:136 +0xa6
  main.main()
      /Users/qinshen/go/src/vk/test-example/timingwheel/main.go:15 +0x57
==================
------------------ runing  2019-08-19 09:23:26.005732 +0800 CST m=+0.842259261
------------------ runing  2019-08-19 09:23:27.005475 +0800 CST m=+1.842035506
------------------ runing  2019-08-19 09:23:28.003353 +0800 CST m=+2.839947241
------------------ runing  2019-08-19 09:23:29.00319 +0800 CST m=+3.839817411
 --------------- timeout 
Found 1 data race(s)
exit status 66

and, testing output in riming wheel self

/Users/qinshen/go/src/github.com/tsingson/timingwheel   go test -race  -count=5 .
==================
WARNING: DATA RACE
Write at 0x00c00000fdb8 by goroutine 39:
  github.com/RussellLuo/timingwheel/delayqueue.(*priorityQueue).Pop()
      /Users/qinshen/go/src/github.com/tsingson/timingwheel/delayqueue/delayqueue.go:64 +0xda
  container/heap.Remove()
      /usr/local/go/src/container/heap/heap.go:77 +0x68
  github.com/RussellLuo/timingwheel/delayqueue.(*priorityQueue).PeekAndShift()
      /Users/qinshen/go/src/github.com/tsingson/timingwheel/delayqueue/delayqueue.go:78 +0x113
  github.com/RussellLuo/timingwheel/delayqueue.(*DelayQueue).Poll()
      /Users/qinshen/go/src/github.com/tsingson/timingwheel/delayqueue/delayqueue.go:131 +0x96
  github.com/RussellLuo/timingwheel.(*TimingWheel).Start.func1()
      /Users/qinshen/go/src/github.com/tsingson/timingwheel/timingwheel.go:137 +0x93
  github.com/RussellLuo/timingwheel.(*waitGroupWrapper).Wrap.func1()
      /Users/qinshen/go/src/github.com/tsingson/timingwheel/utils.go:29 +0x34

Previous read at 0x00c00000fdb8 by main goroutine:
  github.com/RussellLuo/timingwheel/delayqueue.(*DelayQueue).Offer()
      /Users/qinshen/go/src/github.com/tsingson/timingwheel/delayqueue/delayqueue.go:116 +0x13f
  github.com/RussellLuo/timingwheel.(*TimingWheel).add()
      /Users/qinshen/go/src/github.com/tsingson/timingwheel/timingwheel.go:86 +0x324
  github.com/RussellLuo/timingwheel.(*TimingWheel).add()
      /Users/qinshen/go/src/github.com/tsingson/timingwheel/timingwheel.go:106 +0x35b
  github.com/RussellLuo/timingwheel.(*TimingWheel).add()
      /Users/qinshen/go/src/github.com/tsingson/timingwheel/timingwheel.go:106 +0x35b
  github.com/RussellLuo/timingwheel.(*TimingWheel).addOrRun()
      /Users/qinshen/go/src/github.com/tsingson/timingwheel/timingwheel.go:113 +0x42
  github.com/RussellLuo/timingwheel.(*TimingWheel).AfterFunc()
      /Users/qinshen/go/src/github.com/tsingson/timingwheel/timingwheel.go:173 +0x1ae
  github.com/RussellLuo/timingwheel_test.Example_stopTimer()
      /Users/qinshen/go/src/github.com/tsingson/timingwheel/timingwheel_examples_test.go:32 +0xa4
  testing.runExample()
      /usr/local/go/src/testing/example.go:121 +0x2a4
  testing.runExamples()
      /usr/local/go/src/testing/example.go:45 +0x213
  testing.(*M).Run()
      /usr/local/go/src/testing/testing.go:1073 +0x39f
  main.main()
      _testmain.go:52 +0x222

Goroutine 39 (running) created at:
  github.com/RussellLuo/timingwheel.(*waitGroupWrapper).Wrap()
      /Users/qinshen/go/src/github.com/tsingson/timingwheel/utils.go:28 +0x6f
  github.com/RussellLuo/timingwheel.(*TimingWheel).Start()
      /Users/qinshen/go/src/github.com/tsingson/timingwheel/timingwheel.go:136 +0xa6
  github.com/RussellLuo/timingwheel_test.Example_stopTimer()
      /Users/qinshen/go/src/github.com/tsingson/timingwheel/timingwheel_examples_test.go:29 +0x57
  testing.runExample()
      /usr/local/go/src/testing/example.go:121 +0x2a4
  testing.runExamples()
      /usr/local/go/src/testing/example.go:45 +0x213
  testing.(*M).Run()
      /usr/local/go/src/testing/testing.go:1073 +0x39f
  main.main()
      _testmain.go:52 +0x222
==================
FAIL
FAIL	github.com/RussellLuo/timingwheel	10.304s

Add support for scheduling repetitive tasks

Motivation

Currently timingwheel only supports one-time tasks (via AfterFunc), it's meaningful to provide a general mechanism for supporting repetitive tasks (a.k.a. Cron jobs).

(For Chinese users, also see #8.)

The API

Inspired by cron.Schedule, the API is designed to be as follows:

// Scheduler determines the execution plan of a task.
type Scheduler interface {
        // Next returns the next execution time after the given (previous) time.
        // It will return a zero time if no next time is scheduled.
        Next(time.Time) time.Time
}

// ScheduleFunc calls f (in its own goroutine) according to the execution
// plan scheduled by s.
// It returns a Timer that can be used to cancel the call using its Stop method.
//
// Internally, ScheduleFunc will ask the first execution time (by calling 
// s.Next()) initially, and create a timer if the execution time is non-zero.
// Afterwards, it will ask the next execution time each time f is about to
// be executed, and f will be called at the next execution time if the time
// is non-zero.
func (tw *TimingWheel) ScheduleFunc(s Scheduler, f func()) *Timer {
        ...
}

Usage example

Based on the Scheduler interface,it's possible for us to implement all kinds of repetitive tasks.

EveryFunc shortcut

As an example (as well as a comparison of #9), the new implementation will be as follows:

type FuncScheduler func(time.Time) time.Time

func (fs FuncScheduler) Next(prev time.Time) time.Time {
        return fs(prev)
}

func EveryFunc(d time.Duration, f func()) *Timer {
        return tw.ScheduleFunc(FuncScheduler(func(prev time.Time) time.Time {
                return prev.Add(d)
        }), f)
}

Cron expression

By leveraging cronexpr or cron.Parser to parse Cron expression, it's possible for us to support Cron jobs in a more conventional way.

关于addOrRun方法的疑问

// addOrRun inserts the timer t into the current timing wheel, or run the
// timer's task if it has already expired.
func (tw *TimingWheel) addOrRun(t *Timer) {
	if !tw.add(t) {
		// Already expired

		// Like the standard time.AfterFunc (https://golang.org/pkg/time/#AfterFunc),
		// always execute the timer's task in its own goroutine.
		go t.task()
	}
}

如果大批量的时间过期,那此时不是会同时开启多个goroutine,会不会存在内存暴涨的情况。毕竟时间轮实用的场景是大量的时间任务管理的情况。其次就是为什么不考虑在此处加入协程池

其次,我想要在调用ScheduleFunc的过程中,限制任务的执行次数比如执行10次,每次间隔1s就够。如果我采用stop停止当前循环任务,无法达到精准控制次数的目的。

deadlock

this is code:

package main

import (
	"fmt"
	"net/http"
	_ "net/http/pprof"
	"sync/atomic"
	"time"

	"github.com/RussellLuo/timingwheel"
)

type EveryScheduler struct {
	Interval time.Duration
}

func (s *EveryScheduler) Next(prev time.Time) time.Time {
	return prev.Add(s.Interval)
}

func main() {
	tw := timingwheel.NewTimingWheel(time.Millisecond, 60)
	tw.Start()
	defer tw.Stop()
	now := time.Now()
	var addCount int64
	for i := 0; i < 100000; i++ {
		_ = tw.ScheduleFunc(&EveryScheduler{time.Millisecond}, func() {
			atomic.AddInt64(&addCount, 1)
		})
	}

	go func() {
		http.ListenAndServe("0.0.0.0:8000", nil)
	}()
	for {
		n := time.Now()
		fmt.Println(atomic.LoadInt64(&addCount), n.Sub(now))
		now = n
		time.Sleep(time.Second)
	}
}

then, stdout print:

62576 83.7743ms
2516677 1.0003254s
3937967 1.0003248s
5276756 1.0003255s
6510020 1.0000941s
7849283 1.0003247s
9158321 1.0003253s
10420686 1.000325s
11601313 1.0003252s
12916420 1.0003255s
14280131 1.0003252s
15661477 1.0003246s
16964818 1.0003252s
17022683 1.0003653s
17022683 1.000204s
17022683 1.0003877s
17022683 1.0007381s
17022683 1.0004781s
17022683 1.0007608s
17022683 1.0000029s

load pprof:

http://127.0.0.1:8000/debug/pprof/goroutine?debug=2

all goroutine stop for wait mutex

goroutine 7 [semacquire]:
sync.runtime_SemacquireMutex(0xc00008a34c, 0xc000167c00, 0x1)
	C:/Go/src/runtime/sema.go:71 +0x4e
sync.(*Mutex).lockSlow(0xc00008a348)
	C:/Go/src/sync/mutex.go:138 +0x2a8
sync.(*Mutex).Lock(0xc00008a348)
	C:/Go/src/sync/mutex.go:81 +0x65
github.com/RussellLuo/timingwheel.(*bucket).Add(0xc00008a340, 0xc000493780)
	D:/GOPATH/src/github.com/RussellLuo/timingwheel/bucket.go:84 +0x45
github.com/RussellLuo/timingwheel.(*TimingWheel).add(0xc00009e0e0, 0xc000493780, 0xe5d5e00)
	D:/GOPATH/src/github.com/RussellLuo/timingwheel/timingwheel.go:81 +0x157
github.com/RussellLuo/timingwheel.(*TimingWheel).addOrRun(0xc00009e0e0, 0xc000493780)
	D:/GOPATH/src/github.com/RussellLuo/timingwheel/timingwheel.go:119 +0x3c
github.com/RussellLuo/timingwheel.(*TimingWheel).ScheduleFunc.func1()
	D:/GOPATH/src/github.com/RussellLuo/timingwheel/timingwheel.go:225 +0x1a9
github.com/panjf2000/ants.(*goWorker).run.func1(0xc000075a40)
	D:/GOPATH/src/github.com/panjf2000/ants/worker.go:68 +0xf2
created by github.com/panjf2000/ants.(*goWorker).run
	D:/GOPATH/src/github.com/panjf2000/ants/worker.go:48 +0x5e

goroutine 40 [semacquire]:
sync.runtime_SemacquireMutex(0xc00008a34c, 0xc003da2600, 0x1)
	C:/Go/src/runtime/sema.go:71 +0x4e
sync.(*Mutex).lockSlow(0xc00008a348)
	C:/Go/src/sync/mutex.go:138 +0x2a8
sync.(*Mutex).Lock(0xc00008a348)
	C:/Go/src/sync/mutex.go:81 +0x65
github.com/RussellLuo/timingwheel.(*bucket).Add(0xc00008a340, 0xc003c22ee0)
	D:/GOPATH/src/github.com/RussellLuo/timingwheel/bucket.go:84 +0x45
github.com/RussellLuo/timingwheel.(*TimingWheel).add(0xc00009e0e0, 0xc003c22ee0, 0xe5d5e00)
	D:/GOPATH/src/github.com/RussellLuo/timingwheel/timingwheel.go:81 +0x157
github.com/RussellLuo/timingwheel.(*TimingWheel).addOrRun(0xc00009e0e0, 0xc003c22ee0)
	D:/GOPATH/src/github.com/RussellLuo/timingwheel/timingwheel.go:119 +0x3c
github.com/RussellLuo/timingwheel.(*TimingWheel).ScheduleFunc.func1()
	D:/GOPATH/src/github.com/RussellLuo/timingwheel/timingwheel.go:225 +0x1a9
github.com/panjf2000/ants.(*goWorker).run.func1(0xc00030c270)
	D:/GOPATH/src/github.com/panjf2000/ants/worker.go:68 +0xf2
created by github.com/panjf2000/ants.(*goWorker).run
	D:/GOPATH/src/github.com/panjf2000/ants/worker.go:48 +0x5e

goroutine 41 [semacquire]:
sync.runtime_SemacquireMutex(0xc00008a34c, 0xc000165c00, 0x1)
	C:/Go/src/runtime/sema.go:71 +0x4e
sync.(*Mutex).lockSlow(0xc00008a348)
	C:/Go/src/sync/mutex.go:138 +0x2a8
sync.(*Mutex).Lock(0xc00008a348)
	C:/Go/src/sync/mutex.go:81 +0x65
github.com/RussellLuo/timingwheel.(*bucket).Add(0xc00008a340, 0xc00064b620)
	D:/GOPATH/src/github.com/RussellLuo/timingwheel/bucket.go:84 +0x45
github.com/RussellLuo/timingwheel.(*TimingWheel).add(0xc00009e0e0, 0xc00064b620, 0xe5d5e00)
	D:/GOPATH/src/github.com/RussellLuo/timingwheel/timingwheel.go:81 +0x157
github.com/RussellLuo/timingwheel.(*TimingWheel).addOrRun(0xc00009e0e0, 0xc00064b620)
	D:/GOPATH/src/github.com/RussellLuo/timingwheel/timingwheel.go:119 +0x3c
github.com/RussellLuo/timingwheel.(*TimingWheel).ScheduleFunc.func1()
	D:/GOPATH/src/github.com/RussellLuo/timingwheel/timingwheel.go:225 +0x1a9
github.com/panjf2000/ants.(*goWorker).run.func1(0xc00030c2a0)
	D:/GOPATH/src/github.com/panjf2000/ants/worker.go:68 +0xf2
created by github.com/panjf2000/ants.(*goWorker).run
	D:/GOPATH/src/github.com/panjf2000/ants/worker.go:48 +0x5e

goroutine 42 [semacquire]:
sync.runtime_SemacquireMutex(0xc00008a34c, 0xc000323c00, 0x1)
	C:/Go/src/runtime/sema.go:71 +0x4e
sync.(*Mutex).lockSlow(0xc00008a348)
	C:/Go/src/sync/mutex.go:138 +0x2a8
sync.(*Mutex).Lock(0xc00008a348)
	C:/Go/src/sync/mutex.go:81 +0x65
github.com/RussellLuo/timingwheel.(*bucket).Add(0xc00008a340, 0xc000510f20)
	D:/GOPATH/src/github.com/RussellLuo/timingwheel/bucket.go:84 +0x45
github.com/RussellLuo/timingwheel.(*TimingWheel).add(0xc00009e0e0, 0xc000510f20, 0xe5d5e00)
	D:/GOPATH/src/github.com/RussellLuo/timingwheel/timingwheel.go:81 +0x157
github.com/RussellLuo/timingwheel.(*TimingWheel).addOrRun(0xc00009e0e0, 0xc000510f20)
	D:/GOPATH/src/github.com/RussellLuo/timingwheel/timingwheel.go:119 +0x3c
github.com/RussellLuo/timingwheel.(*TimingWheel).ScheduleFunc.func1()
	D:/GOPATH/src/github.com/RussellLuo/timingwheel/timingwheel.go:225 +0x1a9
github.com/panjf2000/ants.(*goWorker).run.func1(0xc00030c2d0)
	D:/GOPATH/src/github.com/panjf2000/ants/worker.go:68 +0xf2
created by github.com/panjf2000/ants.(*goWorker).run
	D:/GOPATH/src/github.com/panjf2000/ants/worker.go:48 +0x5e

goroutine 43 [semacquire]:
sync.runtime_SemacquireMutex(0xc00008a34c, 0xc000325c00, 0x1)
	C:/Go/src/runtime/sema.go:71 +0x4e
sync.(*Mutex).lockSlow(0xc00008a348)
	C:/Go/src/sync/mutex.go:138 +0x2a8
sync.(*Mutex).Lock(0xc00008a348)
	C:/Go/src/sync/mutex.go:81 +0x65
github.com/RussellLuo/timingwheel.(*bucket).Add(0xc00008a340, 0xc003b756e0)
	D:/GOPATH/src/github.com/RussellLuo/timingwheel/bucket.go:84 +0x45
github.com/RussellLuo/timingwheel.(*TimingWheel).add(0xc00009e0e0, 0xc003b756e0, 0xe5d5e00)
	D:/GOPATH/src/github.com/RussellLuo/timingwheel/timingwheel.go:81 +0x157
github.com/RussellLuo/timingwheel.(*TimingWheel).addOrRun(0xc00009e0e0, 0xc003b756e0)
	D:/GOPATH/src/github.com/RussellLuo/timingwheel/timingwheel.go:119 +0x3c
github.com/RussellLuo/timingwheel.(*TimingWheel).ScheduleFunc.func1()
	D:/GOPATH/src/github.com/RussellLuo/timingwheel/timingwheel.go:225 +0x1a9
github.com/panjf2000/ants.(*goWorker).run.func1(0xc00030c300)
	D:/GOPATH/src/github.com/panjf2000/ants/worker.go:68 +0xf2
created by github.com/panjf2000/ants.(*goWorker).run
	D:/GOPATH/src/github.com/panjf2000/ants/worker.go:48 +0x5e

goroutine 44 [semacquire]:
sync.runtime_SemacquireMutex(0xc00008a34c, 0xc00031fc00, 0x1)
	C:/Go/src/runtime/sema.go:71 +0x4e
sync.(*Mutex).lockSlow(0xc00008a348)
	C:/Go/src/sync/mutex.go:138 +0x2a8
sync.(*Mutex).Lock(0xc00008a348)
	C:/Go/src/sync/mutex.go:81 +0x65
github.com/RussellLuo/timingwheel.(*bucket).Add(0xc00008a340, 0xc003a7aac0)
	D:/GOPATH/src/github.com/RussellLuo/timingwheel/bucket.go:84 +0x45
github.com/RussellLuo/timingwheel.(*TimingWheel).add(0xc00009e0e0, 0xc003a7aac0, 0xe5d5e00)
	D:/GOPATH/src/github.com/RussellLuo/timingwheel/timingwheel.go:81 +0x157
github.com/RussellLuo/timingwheel.(*TimingWheel).addOrRun(0xc00009e0e0, 0xc003a7aac0)
	D:/GOPATH/src/github.com/RussellLuo/timingwheel/timingwheel.go:119 +0x3c
github.com/RussellLuo/timingwheel.(*TimingWheel).ScheduleFunc.func1()
	D:/GOPATH/src/github.com/RussellLuo/timingwheel/timingwheel.go:225 +0x1a9
github.com/panjf2000/ants.(*goWorker).run.func1(0xc00030c330)
	D:/GOPATH/src/github.com/panjf2000/ants/worker.go:68 +0xf2
created by github.com/panjf2000/ants.(*goWorker).run
	D:/GOPATH/src/github.com/panjf2000/ants/worker.go:48 +0x5e

goroutine 45 [semacquire]:
sync.runtime_SemacquireMutex(0xc00008a34c, 0xc0039b0b00, 0x1)
	C:/Go/src/runtime/sema.go:71 +0x4e
sync.(*Mutex).lockSlow(0xc00008a348)
	C:/Go/src/sync/mutex.go:138 +0x2a8
sync.(*Mutex).Lock(0xc00008a348)
	C:/Go/src/sync/mutex.go:81 +0x65
github.com/RussellLuo/timingwheel.(*bucket).Add(0xc00008a340, 0xc00210a1a0)
	D:/GOPATH/src/github.com/RussellLuo/timingwheel/bucket.go:84 +0x45
github.com/RussellLuo/timingwheel.(*TimingWheel).add(0xc00009e0e0, 0xc00210a1a0, 0xe5d5e00)
	D:/GOPATH/src/github.com/RussellLuo/timingwheel/timingwheel.go:81 +0x157
github.com/RussellLuo/timingwheel.(*TimingWheel).addOrRun(0xc00009e0e0, 0xc00210a1a0)
	D:/GOPATH/src/github.com/RussellLuo/timingwheel/timingwheel.go:119 +0x3c
github.com/RussellLuo/timingwheel.(*TimingWheel).ScheduleFunc.func1()
	D:/GOPATH/src/github.com/RussellLuo/timingwheel/timingwheel.go:225 +0x1a9
github.com/panjf2000/ants.(*goWorker).run.func1(0xc00030c360)
	D:/GOPATH/src/github.com/panjf2000/ants/worker.go:68 +0xf2
created by github.com/panjf2000/ants.(*goWorker).run
	D:/GOPATH/src/github.com/panjf2000/ants/worker.go:48 +0x5e

goroutine 46 [semacquire]:
sync.runtime_SemacquireMutex(0xc00008a34c, 0xc00032bc00, 0x1)
	C:/Go/src/runtime/sema.go:71 +0x4e
sync.(*Mutex).lockSlow(0xc00008a348)
	C:/Go/src/sync/mutex.go:138 +0x2a8
sync.(*Mutex).Lock(0xc00008a348)
	C:/Go/src/sync/mutex.go:81 +0x65
github.com/RussellLuo/timingwheel.(*bucket).Add(0xc00008a340, 0xc003e77ce0)
	D:/GOPATH/src/github.com/RussellLuo/timingwheel/bucket.go:84 +0x45
github.com/RussellLuo/timingwheel.(*TimingWheel).add(0xc00009e0e0, 0xc003e77ce0, 0xe5d5e00)
	D:/GOPATH/src/github.com/RussellLuo/timingwheel/timingwheel.go:81 +0x157
github.com/RussellLuo/timingwheel.(*TimingWheel).addOrRun(0xc00009e0e0, 0xc003e77ce0)
	D:/GOPATH/src/github.com/RussellLuo/timingwheel/timingwheel.go:119 +0x3c
github.com/RussellLuo/timingwheel.(*TimingWheel).ScheduleFunc.func1()
	D:/GOPATH/src/github.com/RussellLuo/timingwheel/timingwheel.go:225 +0x1a9
github.com/panjf2000/ants.(*goWorker).run.func1(0xc00030c390)
	D:/GOPATH/src/github.com/panjf2000/ants/worker.go:68 +0xf2
created by github.com/panjf2000/ants.(*goWorker).run
	D:/GOPATH/src/github.com/panjf2000/ants/worker.go:48 +0x5e

请教一下, 为什么是用链表?

func (b *bucket) Flush(reinsert func(*Timer)) {
	var ts []*Timer

	b.mu.Lock()
	for e := b.timers.Front(); e != nil; {
		next := e.Next()

		t := e.Value.(*Timer)
		b.remove(t)
		ts = append(ts, t)

		e = next
	}
	b.mu.Unlock()

	b.SetExpiration(-1) // TODO: Improve the coordination with b.Add()

	for _, t := range ts {
		reinsert(t)
	}
}

bucket为什么是用链表呢? 没有体会到它的作用.
添加task的时候直接追加的数组尾部, 到期时直接遍历数组取出所有task. 因为每个bucket里的task到期时间都是一样的, 那直接用数组不行吗?
请大佬解惑

What is this chan used for?

Hi there,

I'm quite confused about this line,

// Send the expired element to the timing wheel.
.
After receiving the data from channel, it does nothing right?

But per your comment stated, it should send the expired element to wheel.
Can you please explain the usage?

Thank you!

32位机器运行会 panic: runtime error

package main

import (
	"fmt"
	"time"

	"github.com/RussellLuo/timingwheel"
)

type EveryScheduler struct {
	Interval time.Duration
}

func (s *EveryScheduler) Next(prev time.Time) time.Time {
	return prev.Add(s.Interval)
}

func main() {
	tw := timingwheel.NewTimingWheel(time.Millisecond, 20)
	tw.Start()
	defer tw.Stop()

	_ = tw.ScheduleFunc(&EveryScheduler{time.Second}, func() {
		fmt.Println("The timer fires")
	})

	select {}
}

在 centos6.5 32位的机器上运行如上代码会造成 panic ,centos7 则正常:

[root@develop test]# go run main.go
go: finding github.com/RussellLuo/timingwheel latest
panic: runtime error: invalid memory address or nil pointer dereference
[signal SIGSEGV: segmentation violation code=0x1 addr=0x0 pc=0x8049fac]

goroutine 1 [running]:
runtime/internal/atomic.Xchg64(0x9c66bcc, 0xcdfcee50, 0x16d, 0x0, 0x1)
        /usr/lib/golang/src/runtime/internal/atomic/asm_386.s:151 +0xc
github.com/RussellLuo/timingwheel.(*bucket).SetExpiration(...)
        /home/gohome/pkg/mod/github.com/!russell!luo/[email protected]/bucket.go:74
github.com/RussellLuo/timingwheel.(*TimingWheel).add(0x9c761e0, 0x9c66620, 0x9c761e0)
        /home/gohome/pkg/mod/github.com/!russell!luo/[email protected]/timingwheel.go:79 +0x167
github.com/RussellLuo/timingwheel.(*TimingWheel).add(0x9c76140, 0x9c66620, 0x9c76140)
        /home/gohome/pkg/mod/github.com/!russell!luo/[email protected]/timingwheel.go:106 +0x1f2
github.com/RussellLuo/timingwheel.(*TimingWheel).add(0x9c760a0, 0x9c66620, 0x9c66640)
        /home/gohome/pkg/mod/github.com/!russell!luo/[email protected]/timingwheel.go:106 +0x1f2
github.com/RussellLuo/timingwheel.(*TimingWheel).addOrRun(0x9c760a0, 0x9c66620)
        /home/gohome/pkg/mod/github.com/!russell!luo/[email protected]/timingwheel.go:113 +0x29
github.com/RussellLuo/timingwheel.(*TimingWheel).ScheduleFunc(0x9c760a0, 0x81173c0, 0x9c120b8, 0x8105430, 0x9c760a0)
        /home/gohome/pkg/mod/github.com/!russell!luo/[email protected]/timingwheel.go:222 +0x2b5
main.main()
        /home/repo/test/main.go:23 +0xba
exit status 2

atomic 库中有如下说明:

Bugs
☞ On x86-32, the 64-bit functions use instructions unavailable before the Pentium MMX. On non-Linux ARM, the 64-bit functions use instructions unavailable before the ARMv6k core. On ARM, x86-32, and 32-bit MIPS, it is the caller's responsibility to arrange for 64-bit alignment of 64-bit words accessed atomically. The first word in a variable or in an allocated struct, array, or slice can be relied upon to be 64-bit aligned.

https://godoc.org/sync/atomic#pkg-note-bug

为什么用这个b unsafe.Pointer 而不用这个*bucket

// Timer represents a single event. When the Timer expires, the given
// task will be executed.
type Timer struct {
   expiration int64 // in milliseconds
   task       func()

   // The bucket that holds the list to which this timer's element belongs.
   //
   // NOTE: This field may be updated and read concurrently,
   // through Timer.Stop() and Bucket.Flush().
   b unsafe.Pointer // type: *bucket

   // The timer's element.
   element *list.Element
}

有什么考究吗?

bug of calculate expiration

the timingwheel add() 方法中:

else if t.expiration < tw.currentTime+tw.interval {
		// Put it into its own bucket
		virtualID := t.expiration / tw.tick
		b := tw.buckets[virtualID%tw.wheelSize]
		b.Add(t)

		// Set the bucket expiration time
		if b.SetExpiration(virtualID * tw.tick) {
			tw.queue.Offer(b)
		}

		return true
	}

求VirtualID应该是irtualID := (t.expiration - tw.currentTime +1) / tw.tick ,b.SetExpiration(virtualID * tw.tick) 应该是b.SetExpiration(tw.currentTime + virtualID * tw.tick)

func (tw *TimingWheel) advanceClock(expiration int64) {
	if expiration >= tw.currentTime+tw.tick {
		tw.currentTime = truncate(expiration, tw.tick)

		// Try to advance the clock of the overflow wheel if present
		overflowWheel := atomic.LoadPointer(&tw.overflowWheel)
		if overflowWheel != nil {
			(*TimingWheel)(overflowWheel).advanceClock(tw.currentTime)
		}
	}
}

advanceClock()中传入的是bucket. Expiration(),按照现在的逻辑bucket. Expiration()=virtualID * tw.tick,advanceClock()方法中的if语句应该是不会成立的。

时间轮对象可以重复使用么

type EveryScheduler struct {
Interval time.Duration
}

func (s *EveryScheduler) Next(prev time.Time) time.Time {
return prev.Add(s.Interval)
}

func main() {
tw := timingwheel.NewTimingWheel(time.Millisecond, 20)
tw.Start()
defer tw.Stop()

t := tw.ScheduleFunc(&EveryScheduler{time.Millisecond * 200}, func() {
	fmt.Println("The timer fires 200",time.Now())
})
t2 := tw.ScheduleFunc(&EveryScheduler{time.Millisecond * 500}, func() {
	fmt.Println("The timer fires 500",time.Now())
})

fmt.Println("main")
time.Sleep(time.Second * 10)
t.Stop()
t2.Stop()

}

时间轮必现定时任务没有执行

type AtomicInt32 struct {
	int32
}

func NewAtomicInt32(n int32) AtomicInt32 {
	return AtomicInt32{n}
}

// Add atomically adds n to the value.
func (i *AtomicInt32) Add(n int32) int32 {
	return atomic.AddInt32(&i.int32, n)
}

var timeCnt = NewAtomicInt32(0)
var tw = timingwheel.NewTimingWheel(time.Millisecond*1, 3)

func init() {
	tw.Start()
}

func CycleTest() {
	tw.AfterFunc(6*time.Millisecond, func() {
		c := timeCnt.Add(1)

		//打印输出经常在 1k~10w内就终止
		//多次运行可以偶现,或一直挂机运行可以偶现
		fmt.Println(c)

		if c < 1000000 {
			CycleTest()
		}
	})
}

func TestTimeS(t *testing.T) {
	CycleTest()
	<-make(chan struct{})
}

Scheduler 例子的问题

// We need to stop the timer since it will be restarted again and again.
    for !t.Stop() {
    }

我不明白为什么要用 for 循环去处理 t.Stop(), 按照 Stop() 的文档:

Stop prevents the Timer from firing. It returns true if the call stops the timer, false if the timer has already expired or been stopped.

If the timer t has already expired and the t.task has been started in its own goroutine; Stop does not wait for t.task to complete before returning. If the caller needs to know whether t.task is completed, it must coordinate with t.task explicitly.

如果返回true,则 timer 被停止了,如果返回 false,则 timer 已经过期了或已经被停止了。我的理解是,无论返回true还是false,都说明这个周期任务已经不会被触发了,这是我单单只看这个Stop()文档的理解。

但是我仔细的读了下 Stop()的源码:

func (t *Timer) Stop() bool {
	stopped := false
	for b := t.getBucket(); b != nil; b = t.getBucket() {
		// If b.Remove is called just after the timing wheel's goroutine has:
		//     1. removed t from b (through b.Flush -> b.remove)
		//     2. moved t from b to another bucket ab (through b.Flush -> b.remove and ab.Add)
		// this may fail to remove t due to the change of t's bucket.
		stopped = b.Remove(t)

		// Thus, here we re-get t's possibly new bucket (nil for case 1, or ab (non-nil) for case 2),
		// and retry until the bucket becomes nil, which indicates that t has finally been removed.
	}
	return stopped
}

以上代码中,我的理解是,只要 timer 还可以取到 bucket, 那就要从 bucket 去 remove 这个 timer,直到 timer 无法取到 bucket (这就说明 这个timer已经从时间轮中被移出,对应的函数也不会再被触发了,也就是 停止成功, 返回 false),所以的确和上面的描述是一致的。。

然后我就看不懂这里为什么要加个 for 循环

for !t.Stop() {
    }

然后注释说 since it will be restarted again and again., 如果 t.Stop() 返回 false 的话,那这里不就无限循环了,,为什么不直接 t.Stop() 嘞,反正按照 Stop() 的逻辑,只要 Stop() 执行完了,t 就不会在fire了。

所以我不太明白这里代码 for 循环的意义何在?希望大佬可以指导一下 @RussellLuo

TimingWheel中currentTime和overflowWheel变量存在竞争问题

TimingWheel结构体中currentTime变量同时为多个goroutine访问,其中 add方法是只读,addvanceClock则是先去读currentTime变量,再判断,然后写入。使用了atomic只保证了读写currentTime变量那一个操作原子化,但是advanceClock则是先读再判断最后再写入, 会与add函数发生竞争,同理overflow变量也是一样好。粗略看了一下,kafka源码里面采用了是读写锁临时保护了这些变量

计算tw.buckets索引的问题

在timingwheel.go的add方法中
b := tw.buckets[virtualID%tw.wheelSize]

这句我觉得应该是 b := tw.buckets[virtualID*tw.tick%tw.wheelSize], 这样算出来的桶索引才是对的

时间轮是不是在go的版本1.14之后与timer对比就没有优势了?

时间轮是不是在go的版本1.14之后与timer对比就没有优势了?

附上benchmark:

goos: darwin
goarch: amd64
pkg: gin-test/api/main
BenchmarkTimingWheel_StartStop/N-1m-12   4582120               254 ns/op              85 B/op          2 allocs/op
BenchmarkTimingWheel_StartStop/N-5m-12   3356630               427 ns/op              46 B/op          1 allocs/op
BenchmarkTimingWheel_StartStop/N-10m-12                  2474842               483 ns/op              60 B/op          1 allocs/op
BenchmarkStandardTimer_StartStop/N-1m-12                 6777975               179 ns/op              84 B/op          1 allocs/op
BenchmarkStandardTimer_StartStop/N-5m-12                 6431217               231 ns/op              85 B/op          1 allocs/op
BenchmarkStandardTimer_StartStop/N-10m-12                5374492               266 ns/op              83 B/op          1 allocs/op
PASS
ok      gin-test/api/main       60.414s

添加定时器异常

panic: runtime error: invalid memory address or nil pointer dereference
[signal SIGSEGV: segmentation violation code=0x1 addr=0x20 pc=0x9cecc5]
goroutine 35650 [running]:
github.com/lonerwolf/danyun-service/server.(*ClientSession).GetMessage.func2()
/home/lonerwolf/danyun-service/server/client_session.go:71 +0x45
created by github.com/RussellLuo/timingwheel.(*TimingWheel).addOrRun
/home/lonerwolf/go/pkg/mod/github.com/!russell!luo/[email protected]/timingwheel.go:117 +0x60

源代码
session.HeartTimeoutTimer.Stop()
session.HeartTimeoutTimer = session.timingWheel.AfterFunc(time.Second*300, func() {
log.Debugf("CloseSession by HeartTimeoutTimer RemoteAddr:%s sessionID:%d", session.conn.RemoteAddr().String(), session.SessionID)
session.Close()
})
AfterFunc 出错,这是为啥

Add support for resetting a timer

Motivation

Implement something like Timer.Reset, to reuse an existing timer and reset its duration.

Solutions

There are three candidate solutions:

1. Add Timer.Reset

func (t *Timer) Reset(d time.Duration) {
    ...
}

Pros:

  • The API is similar to Timer.Reset, which is simple and intuitive.

Cons:

  • Since Reset needs to add the timer back into the timing wheel, the timer must know which timing wheel it belongs to.
  • To achieve this goal, we need to add a new field (e.g. tw *timingwheel.TimingWheel) into the Timer struct. And this is a waste of memory since the total amount of timers is often large.

2. Add TimingWheel.ResetTimer

func (tw *TimingWheel) ResetTimer(t *Timer, d time.Duration) {
    ...
}

Pros:

  • There is no waste of memory.

Cons:

  • The API is simple but not as intuitive as Timer.Reset.

3. Modify TimingWheel.AfterFunc

Add one more argument named t to provide an existing timer. If t is not nil, we reuse it. Otherwise, we create a new timer.

func (tw *TimingWheel) AfterFunc(d time.Duration, f func(), t *Timer) *Timer {
    ...
}

Pros:

  • There is no waste of memory.
  • There is only one API AfterFunc, no need to add a new one.

Cons:

  • AfterFunc becomes more complex.

请教添加任务时的解锁时机问题

bucket.go中,添加任务时Add函数,它的解锁操作是在末尾
为什么不考虑在添加任务至timers双向链表后立马解锁呢?我的理解是bucket这个锁就是为了保护 timers是一个并发不安全的结构
@RussellLuo

func (b *bucket) Add(t *Timer) {
b.mu.Lock()

e := b.timers.PushBack(t)
    // b.mu.Unlock() 为什么不在这个位置进行解锁?
t.setBucket(b)
t.element = e

b.mu.Unlock()

}

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.