Giter Club home page Giter Club logo

lua-resty-rocketmq's Introduction

Name

lua-resty-rocketmq - Lua rocketmq client driver for the ngx_lua based on the cosocket API

Test

Table of Contents

Status

Production ready.

Description

This Lua library is a RocketMq client driver for the ngx_lua nginx module:

This Lua library takes advantage of ngx_lua's cosocket API, which ensures 100% nonblocking behavior.

Quick start

Install OpenResty

for ubuntu:

wget -O - https://openresty.org/package/pubkey.gpg | sudo apt-key add -
echo "deb http://openresty.org/package/ubuntu $(lsb_release -sc) main" \
    | sudo tee /etc/apt/sources.list.d/openresty.list
sudo apt-get update
sudo apt-get -y install openresty

see https://openresty.org/cn/linux-packages.html for more distributions

Install and start RocketMQ

wget https://archive.apache.org/dist/rocketmq/4.9.3/rocketmq-all-4.9.3-bin-release.zip
unzip rocketmq-all-4.9.3-bin-release.zip
cd rocketmq-4.9.3
nohup bash bin/mqnamesrv &
nohup bash bin/mqbroker -n localhost:9876 -c conf/broker.conf &

Run examples of this project

cd examples
chmod +x producer.lua
./producer.lua

Synopsis

    lua_package_path "/path/to/lua-resty-rocketmq/lib/?.lua;;";

    server {
        location /test {
            content_by_lua_block {
                local cjson = require "cjson"
                local producer = require "resty.rocketmq.producer"
                local consumer = require "resty.rocketmq.consumer"

                local nameservers = { "127.0.0.1:9876" }

                local message = "halo world"

                local p = producer.new(nameservers, "produce_group")
                
                -- set acl
                local aclHook = require("resty.rocketmq.acl_rpchook").new("RocketMQ","123456781")
                p:addRPCHook(aclHook)
                
                -- use tls mode
                p:setUseTLS(true)
                
                local res, err = p:send("TopicTest", message)
                if not res then
                    ngx.say("send err:", err)
                    return
                end
                ngx.say("send success")
                
                -- consume
                local c = consumer.new(nameservers, "group1")
                c:subscribe("TopicTest", "*")
                c:registerMessageListener({
                    consumeMessage = function(self, msgs, context)
                        ngx.say("consume success:", cjson.encode(msgs))
                        return consumer.CONSUME_SUCCESS
                    end
                })
                
                c:start()
                ngx.sleep(5)
                c:stop()
            }
        }
    }

Back to TOC

Modules

resty.rocketmq.producer

To load this module, just do this

    local producer = require "resty.rocketmq.producer"

Methods

new

syntax: p = producer.new(nameservers, produce_group, enableMsgTrace, customizedTraceTopic)

nameservers is list of nameserver addresses

addRPCHook

syntax: p:addRPCHook(hook)

hook is a table that contains two functions as follows:

  • doBeforeRequest(self, addr, header, body)
  • doAfterResponse(self, addr, header, body, respHeader, respBody)

there is an acl hook provided, usage is:

    local accessKey, secretKey = "RocketMQ", "12345678"
    local aclHook = require("resty.rocketmq.acl_rpchook").new(accessKey, secretKey)
    p:addRPCHook(aclHook)

setUseTLS

syntax: p:setUseTLS(useTLS)

useTLS is a boolean

setTimeout

syntax: p:setTimeout(timeout)

timeout is in milliseconds, default 3000

registerSendMessageHook

syntax: p:registerSendMessageHook(hook)

hook is a table that contains two functions as follows:

  • sendMessageBefore(self, context)
  • sendMessageAfter(self, context)

context is a table that contains:

  • producer
  • producerGroup
  • communicationMode
  • bornHost
  • brokerAddr
  • message
  • mq
  • msgType
  • sendResult
  • exception

registerEndTransactionHook

syntax: p:registerEndTransactionHook(hook)

hook is a table that contains a function as follows:

  • endTransaction(self, context)

context is a table that contains:

  • producerGroup
  • brokerAddr
  • message
  • msgId
  • transactionId
  • transactionState
  • fromTransactionCheck

send

syntax: res, err = p:send(topic, message, tags, keys, properties)

properties is a table that contains:

  • waitStoreMsgOk
  • delayTimeLevel

In case of success, returns the a table of results. In case of errors, returns nil with a string describing the error.

setTransactionListener

syntax: res, err = p:setTransactionListener(transactionListener)

transactionListener is a table that contains two functions as follows:

  • executeLocalTransaction(self, msg, arg)
  • checkLocalTransaction(self, msg)

sendMessageInTransaction

syntax: res, err = p:sendMessageInTransaction(topic, arg, message, tags, keys, properties)

batchSend

syntax: res, err = p:batchSend(msgs)

msgs is a list of msgs, each msg is a table that contains:

  • topic
  • body
  • tags
  • keys
  • properties

start

syntax: p:start()

note that if you don't call p:start() before sending messages, messages will be sent successfully, but the trace is not send.

stop

syntax: p:stop()

Back to TOC

resty.rocketmq.consumer

To load this module, just do this

    local consumer = require "resty.rocketmq.consumer"

Methods

new

syntax: c = consumer.new(nameservers, consumerGroup)

nameservers is list of nameserver addresses

addRPCHook

syntax: c:addRPCHook(hook)

hook is a table that contains two functions as follows:

  • doBeforeRequest(self, addr, header, body)
  • doAfterResponse(self, addr, header, body, respHeader, respBody)

there is an acl hook provided, usage is:

    local accessKey, secretKey = "RocketMQ", "12345678"
    local aclHook = require("resty.rocketmq.acl_rpchook").new(accessKey, secretKey)
    c:addRPCHook(aclHook)

setUseTLS

syntax: c:setUseTLS(useTLS)

useTLS is a boolean

setTimeout

syntax: c:setTimeout(timeout)

timeout is in milliseconds, default 3000

registerMessageListener

syntax: c:registerMessageListener(messageListener)

messageListener is a table that contains a function as follows:

  • consumeMessage(self, msgs, context)

registerConsumeMessageHook

syntax: c:registerConsumeMessageHook(hook)

hook is a table that contains two functions as follows:

  • consumeMessageBefore(self, context)
  • consumeMessageAfter(self, context)

context is a table that contains:

  • consumerGroup
  • mq
  • msgList
  • success
  • status
  • consumeContextType

subscribe

syntax: c:subscribe(topic, subExpression)

start

syntax: c:start()

stop

syntax: c:stop()

setAllocateMessageQueueStrategy

syntax: c:setAllocateMessageQueueStrategy(strategy)

strategy is a functions as follows:

  • function(consumerGroup, currentCID, mqAll, cidAll)

default value is consumer.AllocateMessageQueueAveragely

getAllocateMessageQueueStrategy

syntax: local strategy = c:getAllocateMessageQueueStrategy()

setEnableMsgTrace

syntax: c:setEnableMsgTrace(enableMsgTrace)

isEnableMsgTrace

syntax: local enableMsgTrace = c:isEnableMsgTrace()

setCustomizedTraceTopic

syntax: c:setCustomizedTraceTopic(customizedTraceTopic)

getCustomizedTraceTopic

syntax: local customizedTraceTopic = c:getCustomizedTraceTopic()

setConsumeFromWhere

syntax: c:setConsumeFromWhere(consumeFromWhere)

getConsumeFromWhere

syntax: local consumeFromWhere = c:getConsumeFromWhere()

setConsumeTimestamp

syntax: c:setConsumeTimestamp(consumeTimestamp)

getConsumeTimestamp

syntax: local consumeTimestamp = c:getConsumeTimestamp()

setPullThresholdForQueue

syntax: c:setPullThresholdForQueue(pullThresholdForQueue)

getPullThresholdForQueue

syntax: local pullThresholdForQueue = c:getPullThresholdForQueue()

setPullThresholdSizeForQueue

syntax: c:setPullThresholdSizeForQueue(pullThresholdSizeForQueue)

getPullThresholdSizeForQueue

syntax: local pullThresholdSizeForQueue = c:getPullThresholdSizeForQueue()

setPullTimeDelayMillsWhenException

syntax: c:setPullTimeDelayMillsWhenException(pullTimeDelayMillsWhenException)

getPullTimeDelayMillsWhenException

syntax: local pullTimeDelayMillsWhenException = c:getPullTimeDelayMillsWhenException()

setPullBatchSize

syntax: c:setPullBatchSize(pullBatchSize)

getPullBatchSize

syntax: local pullBatchSize = c:getPullBatchSize()

setPullInterval

syntax: c:setPullInterval(pullInterval)

getPullInterval

syntax: local pullInterval = c:getPullInterval()

setConsumeMessageBatchMaxSize

syntax: c:setConsumeMessageBatchMaxSize(consumeMessageBatchMaxSize)

getConsumeMessageBatchMaxSize

syntax: local consumeMessageBatchMaxSize = c:getConsumeMessageBatchMaxSize()

setMaxReconsumeTimes

syntax: c:setMaxReconsumeTimes(maxReconsumeTimes)

getMaxReconsumeTimes

syntax: local maxReconsumeTimes = c:getMaxReconsumeTimes()

setClientRebalance

syntax: c:setClientRebalance(clientRebalance)

isClientRebalance

syntax: local clientRebalance = c:isClientRebalance()

setPopThresholdForQueue

syntax: c:setPopThresholdForQueue(popThresholdForQueue)

getPopThresholdForQueue

syntax: local popThresholdForQueue = c:getPopThresholdForQueue()

setPopInvisibleTime

syntax: c:setPopInvisibleTime(popInvisibleTime)

getPopInvisibleTime

syntax: local popInvisibleTime = c:getPopInvisibleTime()

setPopBatchNums

syntax: c:setPopBatchNums(popBatchNums)

getPopBatchNums

syntax: local popBatchNums = c:getPopBatchNums()

Back to TOC

resty.rocketmq.admin

To load this module, just do this

    local admin = require "resty.rocketmq.admin"

Methods

new

syntax: adm = admin.new(nameservers)

nameservers is list of nameserver addresses

addRPCHook

syntax: adm:addRPCHook(hook)

hook is a table that contains two functions as follows:

  • doBeforeRequest(self, addr, header, body)

  • doAfterResponse(self, addr, header, body, respHeader, respBody)

there is an acl hook provided, usage is:

    local accessKey, secretKey = "RocketMQ", "12345678"
    local aclHook = require("resty.rocketmq.acl_rpchook").new(accessKey, secretKey)
    adm:addRPCHook(aclHook)

setUseTLS

syntax: adm:setUseTLS(useTLS)

useTLS is a boolean

setTimeout

syntax: adm:setTimeout(timeout)

timeout is in milliseconds, default 3000

createTopic

syntax: res, err = adm:createTopic(newTopic, queueNum, topicSysFlag)

  • newTopic: the new topic name
  • queueNum: read and write queue numbers
  • topicSysFlag: system flag of the topic

createTopicForBroker

syntax: res, err = adm:createTopicForBroker(addr, topicConfig)

  • addr: broker address
  • topicConfig: a table containing:
    • topic
    • readQueueNums
    • writeQueueNums
    • perm
    • topicFilterType
    • topicSysFlag
    • order

searchOffset

syntax: res, err = adm:searchOffset(mq, timestamp)

  • mq: a table containing:
    • brokerName
    • topic
    • queueId
  • timestamp: search time

maxOffset

syntax: res, err = adm:maxOffset(mq)

  • mq: a table containing:
    • brokerName
    • topic
    • queueId

minOffset

syntax: res, err = adm:minOffset(mq)

  • mq: a table containing:
    • brokerName
    • topic
    • queueId

earliestMsgStoreTime

syntax: res, err = adm:earliestMsgStoreTime(mq)

  • mq: a table containing:
    • brokerName
    • topic
    • queueId

viewMessage

syntax: res, err = adm:viewMessage(offsetMsgId)

queryMessage

syntax: res, err = adm:queryMessage(topic, key, maxNum, beginTime, endTime, isUniqKey)

queryTraceByMsgId

syntax: res, err = adm:queryTraceByMsgId(traceTopic, msgId)

Back to TOC

Proxy quick start

Proxy provides http API of produce/consume messages.

Install and start RocketMQ

wget https://archive.apache.org/dist/rocketmq/4.9.3/rocketmq-all-4.9.3-bin-release.zip
unzip rocketmq-all-4.9.3-bin-release.zip
cd rocketmq-4.9.3
nohup bash bin/mqnamesrv &
nohup bash bin/mqbroker -n localhost:9876 -c conf/broker.conf &

Start proxy

cd examples/
openresty -c server/proxy.conf -p .

Send message using proxy

curl localhost:8080/topics/topic1/messages -d '{"properties": {"a": "3", "KEYS": "key", "TAGS": "tag"}, "body": "hello proxy"}'

Back to TOC

Installation

You need to configure the lua_package_path directive to add the path of your lua-resty-rocketmq source tree to ngx_lua's LUA_PATH search path, as in

    # nginx.conf
    http {
        lua_package_path "/path/to/lua-resty-rocketmq/lib/?.lua;;";
        ...
    }

Ensure that the system account running your Nginx ''worker'' proceses have enough permission to read the .lua file.

Back to TOC

See Also

Back to TOC

lua-resty-rocketmq's People

Contributors

yuz10 avatar

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    ๐Ÿ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. ๐Ÿ“Š๐Ÿ“ˆ๐ŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google โค๏ธ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.