Giter Club home page Giter Club logo

delayedqueue's Introduction

Delayed Queue

README_en

轻量延迟队列,使用 Redis(REdis Serialization Protocal) 协议,可通过 Redis 客户端使用

动机

曾使用 Rabbitmq 的延迟队列插件,数据量达百万后性能急剧下降,占用大量内存。 未发现基于磁盘的实现,本项目因此诞生。

使用示例

package main

import (
	"context"
	"fmt"
	"math/rand"
	"strconv"
	"time"

	"github.com/go-redis/redis/v8"
)

func sendMsgs() {
	client := redis.NewClient(&redis.Options{})
	defer client.Close()
	ctx := context.TODO()
	for j := 0; j < 1000; j++ {
		millis := rand.Int31n(1000 * 60)
		millis += 1000
		data := "Value " + time.Now().Add(time.Duration(millis)*time.Millisecond).String()
		// 通过 sadd 命令发送消息, 延迟时长单位为毫秒
		// sadd 队列名 消息体 延迟时长
		client.SAdd(ctx, "queue_name", data, strconv.Itoa(int(millis)))
	}
}

func pollMsgs(done <-chan struct{}) {
	client := redis.NewClient(&redis.Options{})
	defer client.Close()
	ctx := context.TODO()
	batchSize := int64(1000)
	for {
		select {
		case <-done:
			break
		default:
			// 通过 spop 命令消费消息,若没有到期消息,返回空
			m := client.SPopN(ctx, "queue_name", batchSize)
			if redis.Nil == m.Err() || len(m.Val()) == 0 {
				time.Sleep(time.Millisecond * 300)
				continue
			}
			msgs := m.Val()
			for _, msg := range msgs {
				fmt.Println(msg)
			}
		}
	}
}

具体使用

  1. 编译: go build -o delayedqueue ./server
  2. 配置,配置文件示例位于 config.toml:
port = 6379
engine = "badger"

[badger]
path = "./queue_data"
  1. 运行 ./delayedqueue -c config.toml
  2. 通过 Redis 客户端连接
redis-cli
  1. 发送延迟消息:
> sadd my_queue data_delays_10_seconds 10000
> sadd my_queue data_delays_1_seconds 1000
> sadd my_queue data_delays_5_seconds 5000
  1. 消费消息:
# sleep 6 seconds
> spop my_queue 10
1) "data_delays_1_seconds"
2) "data_delays_5_seconds"
# sleep 4 seconds
> spop my_queue 10
1) "data_delays_10_seconds"

获取队列列表:

> keys
1) my_queue

TODOs

  • 单元测试
  • 基于 Badger 的实现
  • 基于 TiKV 的实现
  • 生产消息: sadd
  • 批量生产消息: sadd
  • 消费消息: spop
  • 获取队列列表: keys
  • 删除队列: del
  • 查询队列长度: scard
  • 接入 Prometheus
  • 接入 gops

已知问题

  1. 消费消息时,spop 操作最差情况下耗时达 200ms, 批量消费的 过小会影响吞吐量。
  2. 目前没有类似 Kaflka 、 Rabbitmq 等队列消费成功后 提交 的机制, 消息最多消费一次,消费过程失败的情况需要消费者自行处理。

delayedqueue's People

Contributors

liyiheng avatar

Stargazers

 avatar  avatar

Watchers

 avatar  avatar

Forkers

wusuluren isgasho

delayedqueue's Issues

支持默认的过期时间

鉴于实际使用时,某种场景下一般过期时间是固定的,比如订单超时等,考虑支持配置默认的过期时间。这样Sadd命令可以去掉过期时间参数,使用默认值,就可以批量操作了,并且使用Zadd允许单独设置过期时间。

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.