Giter Club home page Giter Club logo

zenq's Introduction

ZenQ

A low-latency thread-safe queue in golang implemented using a lock-free ringbuffer and runtime internals

Based on the LMAX Disruptor Pattern

Features

  • Much faster than native channels in both SPSC (single-producer-single-consumer) and MPSC (multi-producer-single-consumer) modes in terms of time/op
  • More resource efficient in terms of memory_allocation/op and num_allocations/op evident while benchmarking large batch size inputs
  • Handles the case where NUM_WRITER_GOROUTINES > NUM_CPU_CORES much better than native channels
  • Selection from multiple ZenQs just like golang's select{} ensuring fair selection and no starvation
  • Closing a ZenQ

Benchmarks to support the above claims here

Installation

You need Golang 1.19.x or above

$ go get github.com/alphadose/zenq/v2

Usage

  1. Simple Read/Write
package main

import (
	"fmt"

	"github.com/alphadose/zenq/v2"
)

type payload struct {
	alpha int
	beta  string
}

func main() {
	zq := zenq.New[payload](10)

	for j := 0; j < 5; j++ {
		go func() {
			for i := 0; i < 20; i++ {
				zq.Write(payload{
					alpha: i,
					beta:  fmt.Sprint(i),
				})
			}
		}()
	}

	for i := 0; i < 100; i++ {
		if data, queueOpen := zq.Read(); queueOpen {
			fmt.Printf("%+v\n", data)
		}
	}
}
  1. Selection from multiple ZenQs just like golang's native select{}. The selection process is fair i.e no single ZenQ gets starved
package main

import (
	"fmt"

	"github.com/alphadose/zenq/v2"
)

type custom1 struct {
	alpha int
	beta  string
}

type custom2 struct {
	gamma int
}

const size = 100

var (
	zq1 = zenq.New[int](size)
	zq2 = zenq.New[string](size)
	zq3 = zenq.New[custom1](size)
	zq4 = zenq.New[*custom2](size)
)

func main() {
	go looper(intProducer)
	go looper(stringProducer)
	go looper(custom1Producer)
	go looper(custom2Producer)

	for i := 0; i < 40; i++ {

		// Selection occurs here
		if data := zenq.Select(zq1, zq2, zq3, zq4); data != nil {
			switch data.(type) {
			case int:
				fmt.Printf("Received int %d\n", data)
			case string:
				fmt.Printf("Received string %s\n", data)
			case custom1:
				fmt.Printf("Received custom data type number 1 %#v\n", data)
			case *custom2:
				fmt.Printf("Received pointer %#v\n", data)
			}
		}
	}
}

func intProducer(ctr int) { zq1.Write(ctr) }

func stringProducer(ctr int) { zq2.Write(fmt.Sprint(ctr * 10)) }

func custom1Producer(ctr int) { zq3.Write(custom1{alpha: ctr, beta: fmt.Sprint(ctr)}) }

func custom2Producer(ctr int) { zq4.Write(&custom2{gamma: 1 << ctr}) }

func looper(producer func(ctr int)) {
	for i := 0; i < 10; i++ {
		producer(i)
	}
}

Benchmarks

Benchmarking code available here

Note that if you run the benchmarks with --race flag then ZenQ will perform slower because the --race flag slows down the atomic operations in golang. Under normal circumstances, ZenQ will outperform golang native channels.

Hardware Specs

❯ neofetch
                    'c.          [email protected]
                 ,xNMM.          ----------------------
               .OMMMMo           OS: macOS 12.3 21E230 arm64
               OMMM0,            Host: MacBookAir10,1
     .;loddo:' loolloddol;.      Kernel: 21.4.0
   cKMMMMMMMMMMNWMMMMMMMMMM0:    Uptime: 6 hours, 41 mins
 .KMMMMMMMMMMMMMMMMMMMMMMMWd.    Packages: 86 (brew)
 XMMMMMMMMMMMMMMMMMMMMMMMX.      Shell: zsh 5.8
;MMMMMMMMMMMMMMMMMMMMMMMM:       Resolution: 1440x900
:MMMMMMMMMMMMMMMMMMMMMMMM:       DE: Aqua
.MMMMMMMMMMMMMMMMMMMMMMMMX.      WM: Rectangle
 kMMMMMMMMMMMMMMMMMMMMMMMMWd.    Terminal: iTerm2
 .XMMMMMMMMMMMMMMMMMMMMMMMMMMk   Terminal Font: FiraCodeNerdFontComplete-Medium 16 (normal)
  .XMMMMMMMMMMMMMMMMMMMMMMMMK.   CPU: Apple M1
    kMMMMMMMMMMMMMMMMMMMMMMd     GPU: Apple M1
     ;KMMMMMMMWXXWMMMMMMMk.      Memory: 1370MiB / 8192MiB
       .cooc,.    .,coo:.

Terminology

  • NUM_WRITERS -> The number of goroutines concurrently writing to ZenQ/Channel
  • INPUT_SIZE -> The number of input payloads to be passed through ZenQ/Channel from producers to consumer
Computed from benchstat of 30 benchmarks each via go test -benchmem -bench=. benchmarks/simple/*.go

name                                     time/op
_Chan_NumWriters1_InputSize600-8          23.2µs ± 1%
_ZenQ_NumWriters1_InputSize600-8          17.9µs ± 1%
_Chan_NumWriters3_InputSize60000-8        5.27ms ± 3%
_ZenQ_NumWriters3_InputSize60000-8        2.36ms ± 2%
_Chan_NumWriters8_InputSize6000000-8       671ms ± 2%
_ZenQ_NumWriters8_InputSize6000000-8       234ms ± 6%
_Chan_NumWriters100_InputSize6000000-8     1.59s ± 4%
_ZenQ_NumWriters100_InputSize6000000-8     309ms ± 2%
_Chan_NumWriters1000_InputSize7000000-8    1.97s ± 0%
_ZenQ_NumWriters1000_InputSize7000000-8    389ms ± 4%
_Chan_Million_Blocking_Writers-8           10.4s ± 2%
_ZenQ_Million_Blocking_Writers-8           2.32s ±21%

name                                     alloc/op
_Chan_NumWriters1_InputSize600-8           0.00B
_ZenQ_NumWriters1_InputSize600-8           0.00B
_Chan_NumWriters3_InputSize60000-8          109B ±68%
_ZenQ_NumWriters3_InputSize60000-8        24.6B ±107%
_Chan_NumWriters8_InputSize6000000-8       802B ±241%
_ZenQ_NumWriters8_InputSize6000000-8     1.18kB ±100%
_Chan_NumWriters100_InputSize6000000-8    44.2kB ±41%
_ZenQ_NumWriters100_InputSize6000000-8    10.7kB ±38%
_Chan_NumWriters1000_InputSize7000000-8    476kB ± 8%
_ZenQ_NumWriters1000_InputSize7000000-8   90.6kB ±10%
_Chan_Million_Blocking_Writers-8           553MB ± 0%
_ZenQ_Million_Blocking_Writers-8           122MB ± 3%

name                                     allocs/op
_Chan_NumWriters1_InputSize600-8            0.00
_ZenQ_NumWriters1_InputSize600-8            0.00
_Chan_NumWriters3_InputSize60000-8          0.00
_ZenQ_NumWriters3_InputSize60000-8          0.00
_Chan_NumWriters8_InputSize6000000-8       2.76 ±190%
_ZenQ_NumWriters8_InputSize6000000-8        5.47 ±83%
_Chan_NumWriters100_InputSize6000000-8       159 ±26%
_ZenQ_NumWriters100_InputSize6000000-8      25.1 ±39%
_Chan_NumWriters1000_InputSize7000000-8    1.76k ± 6%
_ZenQ_NumWriters1000_InputSize7000000-8     47.3 ±31%
_Chan_Million_Blocking_Writers-8           2.00M ± 0%
_ZenQ_Million_Blocking_Writers-8           1.00M ± 0%

The above results show that ZenQ is more efficient than channels in all 3 metrics i.e time/op, mem_alloc/op and num_allocs/op for the following tested cases:-

  1. SPSC
  2. MPSC with NUM_WRITER_GOROUTINES < NUM_CPU_CORES
  3. MPSC with NUM_WRITER_GOROUTINES > NUM_CPU_CORES

Cherry on the Cake

In SPSC mode ZenQ is faster than channels by 92 seconds in case of input size of 6 * 108 elements

❯ go run benchmarks/simple/main.go

With Input Batch Size: 60 and Num Concurrent Writers: 1

Native Channel Runner completed transfer in: 26.916µs
ZenQ Runner completed transfer in: 20.292µs
====================================================================

With Input Batch Size: 600 and Num Concurrent Writers: 1

Native Channel Runner completed transfer in: 135.75µs
ZenQ Runner completed transfer in: 105.792µs
====================================================================

With Input Batch Size: 6000 and Num Concurrent Writers: 1

Native Channel Runner completed transfer in: 2.100209ms
ZenQ Runner completed transfer in: 510.792µs
====================================================================

With Input Batch Size: 6000000 and Num Concurrent Writers: 1

Native Channel Runner completed transfer in: 1.241481917s
ZenQ Runner completed transfer in: 226.068209ms
====================================================================

With Input Batch Size: 600000000 and Num Concurrent Writers: 1

Native Channel Runner completed transfer in: 1m55.074638875s
ZenQ Runner completed transfer in: 22.582667917s
====================================================================

zenq's People

Contributors

alphadose avatar aoang avatar egonelbre avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

zenq's Issues

Master/Main branch borked?

Simple clone of the current main branch on Go version 1.19.4.

Maybe I'm missing some hidden snowflake?

ZenQ git:(main) go run examples/simple/main.go
# github.com/alphadose/zenq/v2
./zenq.go:94:6: array length constants.CacheLinePadSize - unsafe.Sizeof(selectFactory[T]{}) (value of type uintptr) must be constant
./select_list.go:27:4: n.threadPtr undefined (type *node has no field or method threadPtr)
./select_list.go:27:17: n.dataOut undefined (type *node has no field or method dataOut)
./select_list.go:28:4: n.next undefined (type *node has no field or method next)
./select_list.go:35:6: invalid recursive type node
        ./select_list.go:35:6: node refers to
        ./select_list.go:36:18: "sync/atomic".Pointer refers to
        ./select_list.go:35:6: node
./select_list.go:47:4: n.threadPtr undefined (type *node has no field or method threadPtr)
./select_list.go:47:17: n.dataOut undefined (type *node has no field or method dataOut)
./select_list.go:50:15: tail.next undefined (type *node has no field or method next)
./select_list.go:53:13: tail.next undefined (type *node has no field or method next)
./thread_parker.go:25:18: invalid recursive type "sync/atomic".Pointer
        ./thread_parker.go:25:18: "sync/atomic".Pointer refers to
        ./thread_parker.go:25:27: parkSpot refers to
        ./thread_parker.go:25:18: "sync/atomic".Pointer
./select_list.go:53:13: too many errors

How I get the sequence number from a ring ZenQ

Hello. I have more questions from this #10. Assume I created three or more ZenQ 3 rings/instances and want to balance the size of ZenQ when the producer delivers a message by selecting the ring with the lowest sequence. Could you suggest to me how I can retrieve the length of a ring or the most recent sequence from ZenQ?

FYI. I'm new to LMAX Disruptor. Please excuse any misunderstanding.

More Benchmarks for your feedback

Hi, here is some feedback for you to get results on different architecture
Ubuntu 22.04
AMD Ryzen Threadripper 3960X
128 GB RAM

Simple

~/go/pkg/mod/github.com/alphadose/[email protected]$ go run benchmarks/simple/main.go
With Input Batch Size: 60 and Num Concurrent Writers: 1

Native Channel Runner completed transfer in: 11.462µs
ZenQ Runner completed transfer in: 4.739µs
====================================================================

With Input Batch Size: 600 and Num Concurrent Writers: 1

Native Channel Runner completed transfer in: 32.522µs
ZenQ Runner completed transfer in: 28.795µs
====================================================================

With Input Batch Size: 6000 and Num Concurrent Writers: 1

Native Channel Runner completed transfer in: 449.323µs
ZenQ Runner completed transfer in: 254.583µs
====================================================================

With Input Batch Size: 6000000 and Num Concurrent Writers: 1

Native Channel Runner completed transfer in: 365.257669ms
ZenQ Runner completed transfer in: 447.14544ms
====================================================================

With Input Batch Size: 600000000 and Num Concurrent Writers: 1

Native Channel Runner completed transfer in: 38.346896012s
ZenQ Runner completed transfer in: 48.59434176s
====================================================================

Selector

Chan Select Runner completed transfer in: 4m32.220766401s
ZenQ Select Runner did not complete transfer after 30min+ (tested both Ubuntu and Windows)

Questions regarding struct fields order

Could you please explain:

  1. Why there is padding at the beggining of ZenQ structure?
	// ZenQ is the CPU cache optimized ringbuffer implementation
	ZenQ[T any] struct {
		// The padding members 0 to 4 below are here to ensure each item is on a separate cache line.
		// This prevents false sharing and hence improves performance.
		_           cacheLinePadding
		writerIndex atomic.Uint32
		_           [constants.CacheLinePadSize - unsafe.Sizeof(atomic.Uint32{})]byte
		readerIndex atomic.Uint32
		_           [constants.CacheLinePadSize - unsafe.Sizeof(atomic.Uint32{})]byte
		metaQ
		_ [constants.CacheLinePadSize - unsafe.Sizeof(metaQ{})]byte
		selectFactory[T]
		_ [constants.CacheLinePadSize - unsafe.Sizeof(selectFactory[T]{})]byte
	}
  1. Ordering of the slot type, as I think that on 64 bit system there would be waste of 32 bit in case of item being smaller than 32 bit as 64bit pointer must be 64 aligned. Moving writeParker to the top of the structure would solve the issue.
slot[T any] struct {
		atomic.Uint32
		writeParker *ThreadParker[T]
		item        T
	}

*zenq.ZenQ.Read() eat 100% CPU

I have a goroutine read a value from zenq queue in a for loop, but the Read() function use the whole cpu.

image

image

when I comment these Read() function and recompile the cpu usage drop to zero.
image
image

I tried run profile this program , the top10 cum shows
image

Please tell me how to solve this problem, thank you!

Is Select() providing the same value twice or am i missing something here?

Had a section of code which pulls a value from a zenq queue:

		if dat := zenq.Select(b.notifyQ, b.writeQ); dat != nil {
			switch dat.(type) {
			case *bounceNofity:
				switch dat.(*bounceNofity).event {
				case bShutdown:
					model_debugf("BounceMap: writerLoop got shutdown")
					shutdown = true
				case bShutdownnow:
					break writerLoop
				}
			case *Bounce:
				bnc := dat.(*Bounce)
				if bnc.writecb != nil {
					bnc.writecb(bnc)
				}
				if bnc.writecb2 != nil {
					bnc.writecb2(bnc)
				}
			}

When doing a tests of a 1000 concurrent "Bounce" pointers coming in to the zenq concurrently, i get a handful of duplicate reads from zenq.

When i switch the zenq over to a normal channel, I don't have this issue:

		select {
		case bnc := <-b.writeQ:
			if bnc.writecb != nil {
				bnc.writecb(bnc)
			}
			if bnc.writecb2 != nil {
				bnc.writecb2(bnc)
			}
		case ev := <-b.notifyQ:
			switch ev.event {
			case bShutdown:
				model_debugf("BounceMap: writerLoop got shutdown")
				shutdown = true
			case bShutdownnow:
				break writerLoop
			}
		}

Provided 4 sources files to completely reproduce the test in question: TestBounceMap1000

I may well have a bug in my code. Or just don't understand something fundamental about Zenq - but by switching out to channels i dont have the issue.

go test -tags model_debug -v -run TestBounceMap

cpu 100%

cpu 100% even no data is processing

fatal error: casgstatus: waiting for Gwaiting but is Grunnable

A fatal error occurs during benchmarking

fatal error: casgstatus: waiting for Gwaiting but is Grunnable

Benchmark Code: https://github.com/lemon-mint/golang-q-benchmark
go version go1.18.4 windows/amd64

goos: windows
goarch: amd64
pkg: github.com/lemon-mint/golang-q-benchmark
cpu: Intel(R) Core(TM) i7-4790K CPU @ 4.00GHz
BenchmarkZenQ-8   	fatal error: casgstatus: waiting for Gwaiting but is Grunnable

runtime stack:
runtime.throw({0x930baf?, 0xc000084000?})
	C:/Program Files/Go/src/runtime/panic.go:992 +0x76
runtime.casgstatus(0xc0002844e0, 0x4, 0x1)
	C:/Program Files/Go/src/runtime/proc.go:978 +0x385
runtime.ready(0xc0002844e0, 0xc0002844e0?, 0x10?)
	C:/Program Files/Go/src/runtime/proc.go:857 +0x71
runtime.goready.func1()
	C:/Program Files/Go/src/runtime/proc.go:372 +0x26
runtime.systemstack()
	C:/Program Files/Go/src/runtime/asm_amd64.s:469 +0x4e

goroutine 87 [running]:
runtime.systemstack_switch()
	C:/Program Files/Go/src/runtime/asm_amd64.s:436 fp=0xc0003f1df8 sp=0xc0003f1df0 pc=0x8531e0
runtime.goready(0x85df1e?, 0xc00040fce0?)
	C:/Program Files/Go/src/runtime/proc.go:371 +0x47 fp=0xc0003f1e28 sp=0xc0003f1df8 pc=0x82a547
github.com/alphadose/zenq/v2.safe_ready(0x8f1785?)
	G:/go/pkg/mod/github.com/alphadose/zenq/[email protected]/lib_runtime_linkage.go:176 +0x54 fp=0xc0003f1e50 sp=0xc0003f1e28 pc=0x8efe54
github.com/alphadose/zenq/v2.(*ThreadParker[...]).Ready(0xc000442860?)
	G:/go/pkg/mod/github.com/alphadose/zenq/[email protected]/thread_parker.go:64 +0x95 fp=0xc0003f1eb8 sp=0xc0003f1e50 pc=0x8f1855
github.com/alphadose/zenq/v2.(*ZenQ[...]).Read(0xc000072640)
	G:/go/pkg/mod/github.com/alphadose/zenq/[email protected]/zenq.go:188 +0xfa fp=0xc0003f1f20 sp=0xc0003f1eb8 pc=0x8f1a3a
github.com/lemon-mint/golang-q-benchmark.BenchmarkZenQ.func1(0xc00020c020)
	g:/work/golang-q-benchmark/main_test.go:24 +0xb8 fp=0xc0003f1f80 sp=0xc0003f1f20 pc=0x8f0958
testing.(*B).RunParallel.func1()
	C:/Program Files/Go/src/testing/benchmark.go:788 +0xcb fp=0xc0003f1fe0 sp=0xc0003f1f80 pc=0x8ab0cb
runtime.goexit()
	C:/Program Files/Go/src/runtime/asm_amd64.s:1571 +0x1 fp=0xc0003f1fe8 sp=0xc0003f1fe0 pc=0x855541
created by testing.(*B).RunParallel
	C:/Program Files/Go/src/testing/benchmark.go:781 +0x105

goroutine 1 [chan receive]:
testing.(*B).doBench(0xc00013c240)
	C:/Program Files/Go/src/testing/benchmark.go:285 +0x7f
testing.(*benchContext).processBench(0xc0000040f0, 0x238?)
	C:/Program Files/Go/src/testing/benchmark.go:589 +0x3aa
testing.(*B).run(0xc00013c240?)
	C:/Program Files/Go/src/testing/benchmark.go:276 +0x67
testing.(*B).Run(0xc00013c000, {0x9280ec?, 0x2f82bb486508?}, 0x932a68)
	C:/Program Files/Go/src/testing/benchmark.go:677 +0x453
testing.runBenchmarks.func1(0xc00013c000?)
	C:/Program Files/Go/src/testing/benchmark.go:550 +0x6e
testing.(*B).runN(0xc00013c000, 0x1)
	C:/Program Files/Go/src/testing/benchmark.go:193 +0x102
testing.runBenchmarks({0x92f7db, 0x28}, 0xa69b60?, {0xa0f3e0, 0x4, 0x40?})
	C:/Program Files/Go/src/testing/benchmark.go:559 +0x3f2
testing.(*M).Run(0xc0000701e0)
	C:/Program Files/Go/src/testing/testing.go:1726 +0x811
main.main()
	_testmain.go:53 +0x1aa

image

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.