Giter Club home page Giter Club logo

lua-resty-kafka's People

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

lua-resty-kafka's Issues

no resolver defined to resolve "name"

Hello! May be you can help me.
I have some producer:

 local broker_list = {
    { host = "127.0.0.1", port = 9092 }
 }
local p = producer:new(broker_list)
local key = "key"
local mess = "hi"
local offset, err = p:send('test', key, mess)

I get responce:
no resolver defined to resolve "esha"
I searched about nginx resolver, but
not solve my problem.
In my /etc/resolv.conf:
nameserver 127.0.1.1
In my /etc/hosts:

127.0.0.1   localhost
::1         localhost
27.0.1.1   esha

Where "esha" in error? If i have "localhost" in broker list ?

Now, i solve problem use "stub" in /producer.lua:

129     local config = brokers[leader]
130     
131     config.host = "127.0.0.1"
132     local bk = broker:new(config.host, config.port, self.socket_config)
133     self.producer_brokers[leader] = bk
134     
135     return bk

What would you recommend?

In code demo:

-- this is async producer_type and bp will be reused in the whole nginx worker
local bp = producer:new(broker_list, { producer_type = "async" })
local ok, err = p:send("test", key, message)

p == bp

this should be
bp:send("test", key, message)

How to make it work with proxy_pass

Hi

Can this driver run on the phases other than 'content_by_lua'? If it can only run on 'content_by_lua', this will cause a conflict with proxy_pass which is also run on the 'content phase' of nginx.
How to make this kafka producer work well when I have an upstream configuration?
My main purpose is to output upstream_response_time to kafka.

Thank you

is this production ready?

In the readme it is written that it is still in the finalization but the inscriptions are 3 years old.

all brokers failed in fetch topic metadata

2016/07/26 20:54:58 [error] 17646#0: *34 connect() failed (113: No route to host), client: 192.168.1.111, server: 192.168.1.160, request: "GET /meta HTTP/1.1", host: "192.168.1.160:4002"
2016/07/26 20:54:58 [error] 17646#0: *34 [lua] client.lua:209: _fetch_metadata(): all brokers failed in fetch topic metadata, client: 192.168.1.111, server: 192.168.1.160, request: "GET /meta HTTP/1.1", host: "192.168.1.160:4002"
2016/07/26 20:54:58 [error] 17646#0: *34 lua entry thread aborted: runtime error: content_by_lua(nginx.conf:71):16: attempt to index local 'brokers' (a nil value)
stack traceback:
coroutine 0:
    content_by_lua(nginx.conf:71): in function <content_by_lua(nginx.conf:71):1>, client: 192.168.1.111, server: 192.168.1.160, request: "GET /meta HTTP/1.1", host: "192.168.1.160:4002"

async producer fails when first kafka instance in the list is down

I am using async kafka producer in our lua code, and we recently got situation than one of the kafka machines when down. Unfortunately, async client was continiously failing in this case, and I guess it was metadata request failing

The way we configure a kafka cluster in our lua config:

local config = {
  ["kafka"]={
    { host = "down", port = 9092 },
    { host = "foo", port = 9092 },
    { host = "bar", port = 9092 },
  },
}

Host 'down' obviously went down during incident. Others were up. All hosts are configured by IP addresses, not the hostnames

The way we use async producer:

      ....
local function send_messages_to_kafka(messages)
        local error_handle = function (topic, partition_id, queue, index, err, retryable)
          ngx.log(ngx.ERR, "failed to send to kafka: " .. err)
        end

        local producer_kafka, err = kafkaproducer:new(config.kafka, { producer_type = "async", refresh_interval = 5000, error_handle = error_handle })
        .....

And the result we observed in logs:


2016/04/24 12:04:06 [error] 22322#0: lua tcp socket connect timed out, context: ngx.timer, client: some.client.ip.here, server: 0.0.0.0:80
2016/04/24 12:04:06 [error] 22322#0: [lua] dataeye_logging_helper.lua:12: failed to send to kafka: timeout, context: ngx.timer, client: some.client.ip.here, server: 0.0.0.0:80

So my question here - is there any good workaround for this problem? I can't really understand from the resty-kafka code if it performs fallback across multiple kafka instances

cli:fetch_metadata: result has error "Cannot serialise table: excessively sparse array"

I added the request handler as example in README. However, fetch_metadata() seemed to return empty result which caused the error.
local cli = client:new(broker_list)
local brokers, partitions = cli:fetch_metadata("exp_khuang")
ngx.say("brokers: ", cjson.encode(brokers), "; partitions: ", cjson.encode(partitions)) -- Error thrown here.

I noticed the traceback is like client:fetch_metadata->broker:send_receive(). I added some dump in send_receive(). The request:packet() sent to kafka broker node is "true", and received data is nil. I am not sure where it goes wrong. Could anyone kindly suggest? Thanks a lot!

send bytes to kafka throws an error

I'm try to send byte array to kafka.
local ok, err = bp:send("m5", key, msg_bytes)
and msg_bytes is byte table.
But there throws an error.
/lua-resty-kafka-master/lib/resty/kafka/request.lua:158: invalid value (table) at index 6 in table for 'concat'

Then i checked the request.lua ,and i found


    local req = {
        -- MagicByte
        str_int8(0),
        -- XX hard code no Compression
        str_int8(0),
        str_int32(key_len),
        key,
        str_int32(len),
        msg,
    }

    local str = concat(req)

Seems i can't send bytes to kafka,because the client needs to concat the key and the message.

Is there any way i can send the bytes to kakfa directly , not encoding them with Base64?

lua-resty kafka

I used hostname ,find error is
[error] 13484#0: *105 [lua] client.lua:150: _fetch_metadata(): all brokers failed in fetch topic metadata, client: 127.0.0.1, server: localhost, request: "GET /test HTTP/1.1", host: "localhost"

local broker_list = {
{ host = "storm01", port = 9092 }
}

I hope you can help me .

ngx.req.get_body_data() 通过kafka发送中文乱码

local producer = require "resty.kafka.producer"

local broker_list = {
{ host = "10.7.13.112", port = 9092 },
{ host = "10.7.13.114", port = 9092 },
{ host = "10.7.13.115", port = 9092 },
}
ngx.req.read_body()
local post = ngx.req.get_body_data()
local bp = producer:new(broker_list, { producer_type = "async" })
if post then
local ok, err = bp:send("footPrint_eventlog", nil,post)
if not ok then
ngx.log(ngx.ERR, "kafka send err:", err)
return
end
ngx.say("{"code":200,"description":"ok"}")
else
ngx.say("{"code":400,"description":"no content"}")
end

kafka 发送post数据成功 接受到消息 中文有乱码 post通过ngx.log 输出文件没有乱码 求问大神什么原因

no resolver defined to resolve "localhost"

return:
brokers: [{"host":"localhost","port":9092}]; partitions: {"0":{"id":0,"errcode":0,"replicas":[1],"isr":[1],"leader":1},"1":{"id":1,"errcode":0,"replicas":[1],"isr":[1],"leader":1},"errcode":0,"num":2}
send err1:no resolver defined to resolve "localhost"

code:

local cjson = require "cjson"
local client = require "resty.kafka.client"
local producer = require "resty.kafka.producer"

local broker_list = {
--{ host = "192.168.1.1", port = 9092 },
{ host = "127.0.0.1", port = 9092 }
}

local key = "key"
local message = "halo world"

-- usually we do not use this library directly
local cli = client:new(broker_list)
local brokers, partitions = cli:fetch_metadata("test")
if not brokers then
ngx.say("fetch_metadata failed, err:", partitions)
end
ngx.say("brokers: ", cjson.encode(brokers), "; partitions: ", cjson.encode(partitions))

-- sync producer_type
local p = producer:new(broker_list)

local offset, err = p:send("test", key, message)
if not offset then
ngx.say("send err1:", err)
return
end
ngx.say("send success, offset: ", tonumber(offset))

How can I solve it?

send result success, in customer not thing

this is my code , the result show: send success, offset: true
but in customer i can't get anything;
location ^~/1 { default_type 'text/html'; access_log off; access_by_lua ' local producer = require "resty.kafka.producer" local broker_list = { { host = "192.168.210.24", port = 6667 }, { host = "192.168.210.25", port = 6667 }, } local producer_config = {producer_type = "async"} local p = producer:new(broker_list , producer_config) local offset, err = p:send("usertrajectory" , nil , "1111111111111111111112222222222" ) if not offset then ngx.say("send err:", err) return end ngx.say("send success, offset: ", offset) return '; }

Messages aren't dispatched to Kafka, but no error is shown.

Hi,
I tried you producer example, but it's not working. I tried to send a message, but kafka doesn't receive it.
LUA gives me no errors, and both the "err" and the "resp" variable are nil.
I'm sure I can connect to Kafka from that machine because I tested the connection with kafkacat and the sending of messages via CLI works.

Memory leak when using async method for producing Kafka messages

Hi - We are running a portion of our network through OpenResty / Kafka using your driver, in order to test it out. We are producing about 100-200 messages per second onto Kafka.

When we set producer_type = async, memory utilization increases until the following error is output to our logs:

*emergency(): Unable to connect to kafka: buffer overflow

Can you please help us understand what is causing this memory leak? Thanks!

buffered messages send to kafka err timeout

nginx error log:
[error] 30952#0: *30248161 lua tcp socket read timed out, context: ngx.timer, client: xxxx, server: xxxx
[error] 30952#0: *30248161 [lua] producer.lua:258: buffered messages send to kafka err: timeout, retryable: nil, topic:xxxx, partition_id: 1, length: 1, context: ngx.timer, client: xxxx, server: xxxx

but it did not happen frequently. kafka works fine and everything is OK, I have set request_timeout 10000ms like this:

local bp = producer:new(broker_list, { producer_type = "async", request_timeout = 10000 })

but it seems to be not work.
could you give me a favor?

send err:not foundd topic

Hi,

Not sure why I'm getting this error and the topic do exist. I also tested it through client python with the same topic and that worked.

....
....

local broker_list = {
{ host = "127.0.0.1", port = 9092 },
}

    local key = "key1"
     local message = "hello world"
     local p = producer:new(broker_list)

     local offset, err = p:send("test", key,message)
     if not offset then
         ngx.say("send err:", err)
         return
     end
     ngx.say("send success, offset: ", tostring(offset))

Using:
Openresty 1.7.2.1
kafka server 2.10-0.8.2.0

Also, I tested on 0.8.1 and 0.8.1.1
Thanks,

kafka restart , lua kafka product send err, nginx must be reload manually

HI,ALL:

when I restart my kafka cluster, My resty kafka product will can't send message. I mush reload my nginx then it will be OK . How to do resty kafka product ,so it will be automatically check the kafka cluser .

producer.lua:258: buffered messages send to kafka err: not found broker, retryable: true, topic: test, partition_id: 0, length: 219, context: ngx.timer

return empty brokers

local cli = client:new(broker_list)
local brokers, partitions = cli:fetch_metadata(kafka.topic)
if not brokers then
ngx.log(ngx.ERR, "fetch_metadata failed, err:", partitions)
end
ngx.log(ngx.ERR, "brokers: ", #brokers, "; partitions: ", cjson.encode(partitions))

I use the code above in my macos environment and I can ensure the broker_list is available and is it bug to just return a empty brokers? but partitions info is ok

In async model , buffer overflow occurs frequently

We use the lua-resty-kafka to send our log to the kafka.
The qps is 6K+ , size per request is 0.6K.
However we see many buffer overflow errors in the errlog.
Andr i found this error in ringbuffer.lua .

function _M.add(self, topic, key, message)
    local num = self.num
    local size = self.size

    if num >= size then
        return nil, "buffer overflow"
    end

    local index = (self.start + num) % size
    local queue = self.queue

    queue[index] = topic
    queue[index + 1] = key
    queue[index + 2] = message

    self.num = num + 3

    return true
end

What config should i set?
And what does this error mean?

Scope of the producer and bp

In the readme example, a broker is instantiated in the scope of a location. Is it possible to do the require statement in an init_by_lua_block? I have the same question for the bp?

So will the following work as expected, or do you believe it not to be thread-safe ?

http {
  lua_package_path "/data/programs/lua-resty-kafka/lib/?.lua;;";

  init_by_lua_block {
    cjson = require("cjson")

    local kafka_broker_list = {{host = "kafka1.mydomain.com", port = 9092}}
    local kafka_producer_config = {producer_type = "async"}

    -- can this be instantiated here ?
    local producer = require "resty.kafka.producer"
    -- edited this line: made bp global VM level 
   bp = producer:new(kafka_broker_list, kafka_producer_config)
  }

  server {
    location /test {
      content_by_lua_block {
        local args, err = ngx.req.get_uri_args()
        local message = {
          test_arg1 = args.test_arg1 or nil
        }
        message = cjson.encode(message)
        local ok, err = bp:send("test_topic", "test_key", message)
        if not ok then
          ngx.say("send err:", err)
          return
        end

        ngx.say("send success, ok:", ok)
      }
    }
  }
}

runtime error: content_by_lua(lua-kafka.conf:50):21: Cannot serialise table

1 部署环境
openresty-1.9.15.1.tar.gz + 最新lua-resty-kafka + CDH5.4.0(kafka1.3.0)+ CentOS6.4 64位

2 lua-resty-kafka相应的nginx配置文件内容

server {
listen 8081;
server_name 192.168.15.84;
location /test {
content_by_lua '
local cjson = require "cjson"
local client = require "resty.kafka.client"
local producer = require "resty.kafka.producer"

            local broker_list = {
                { host = "192.168.15.81", port = 9092 },
                { host = "192.168.15.82", port = 9092 },
                { host = "192.168.15.83", port = 9092 }
            }

            local key = "key"
            local message = "halo world"

            -- usually we do not use this library directly
            local cli = client:new(broker_list)
            local brokers, partitions = cli:fetch_metadata("test")
            if not brokers then
                ngx.say("fetch_metadata failed, err:", partitions)
            end
            ngx.say("brokers: ", cjson.encode(brokers), "; partitions: ", cjson.encode(partitions))


            -- sync producer_type
            local p = producer:new(broker_list)

            local offset, err = p:send("test", key, message)
            if not offset then
                ngx.say("send err:", err)
                return
            end
            ngx.say("send success, offset: ", tonumber(offset))

            -- this is async producer_type and bp will be reused in the whole nginx worker
            local bp = producer:new(broker_list, { producer_type = "async" })

            local ok, err = bp:send("test", key, message)
            if not ok then
                ngx.say("send err:", err)
                return
            end

            ngx.say("send success, ok:", ok)
        ';
}

}

3 请求例子
curl -X POST -H ' charset=UTF-8' -d 'sssss' http://192.168.15.84:8081/test

4 出现的问题(数据不能写入kafka),日志详情如下:
2016/06/21 18:10:30 [error] 22381#0: *1 lua entry thread aborted: runtime error: content_by_lua(lua-kafka.conf:50):21: Cannot serialise table: excessively sparse array
stack traceback:
coroutine 0:
[C]: in function 'encode'
content_by_lua(lua-kafka.conf:50):21: in function <content_by_lua(lua-kafka.conf:50):1>, client: 192.168.15.84, server: 192.168.15.84, request: "POST /test HTTP/1.1", host: "192.168.15.84:8081"

5 非常感谢,期待您的回答。

broker_list parameter didn't take effect when add producer_type = "async" to producer constructor

I have multi route in nginx config, each route register a kafka topic to produce message into each kafka cluster. The problem I have come across is that when I add a new route to send one topic into a new kafka cluster with the producer_type = "async" parameter, I saw the new topic messages still send into the kafka cluster I have configiured in the old route. I solve this problem by removing the producer_type = "async" parameter.

May this will help.

Usage/installation?

Hi,
Could you provide a little example about how to install and use the library?

too many pending timers

if I send fast, the error log:

[lua] producer.lua:280: _flush_buffer(): failed to create timer at _flush_buffer, err: too many pending timers.

But I use the ngx.timer.pending_count() and ngx.timer.running_count() function watching the pending and running value. The pending value max is 2 and the running value max is 1.

This is my nginx.conf setting:

lua_max_pending_timers 1024;
lua_max_running_timers 256;

Thanks!

ERROR: malformed number

2015/08/14 15:03:34 [error] 17858#0: *122 failed to run log_by_lua*: error loading module 'resty.kafka.response' from file '/usr/local/sh
1306     /usr/local/share/lua/5.1/resty/kafka/response.lua:76: malformed number near '4294967296LL'
1307 stack traceback:
1308     [C]: ?
1309     [C]: in function 'require'
1310     /usr/local/share/lua/5.1/resty/kafka/producer.lua:4: in main chunk
1311     [C]: in function 'require'
1312     /etc/nginx/lua/log/log_into_kafka.lua:1: in function </etc/nginx/lua/log/log_into_kafka.lua:1> while logging request, client: 14.153.
1313 2015/08/14 15:03:35 [error] 17858#0: *122 failed to run log_by_lua*: /etc/nginx/lua/log/log_into_kafka.lua:1: loop or previous error load
1314 stack traceback:
1315     [C]: in function 'require'

no offset and no error

I run the following code:

  local p = producer:new(broker_list)
  local offset, err = p:send(topic, key, body)
  if not offset then
    ngx.log(ngx.ERR, err)
    return ngx.exit(ngx.HTTP_INTERNAL_SERVER_ERROR)
  end

I've got some 500 error, but when i log the error i don't have anything either.

2017/01/20 16:33:02 [error] 10#10: *719 [lua] script.lua:33: , client: 10.10.10.10, server: api.example.com, request: "POST /some/path HTTP/1.1", host: "api.example.com"

I can't figure out how to debug this. any idea ?

104: Connection reset by peer

The error log is:

2016/07/27 14:02:56 [error] 7253#0: *25 recv() failed (104: Connection reset by peer), client: 192.168.1.111, server: 192.168.1.160, request: "GET /collect HTTP/1.1", host: "192.168.1.160:4002"

and the config is:

location /collect {
            lua_need_request_body on;
            client_max_body_size 5M;
            client_body_buffer_size 256k;
            default_type application/json;

            content_by_lua '
                local cjson = require "cjson"
                local client = require "resty.kafka.client"
                local producer = require "resty.kafka.producer"

                local broker_list = {
                    { host = "192.168.1.151", port = 9092 }
                }
                local key = "key"
                local message = "halo world"

                -- sync producer_type
                local p = producer:new(broker_list)

                local offset, err = p:send("demo2", key, message)
                if not offset then
                    ngx.say("send err:", err)
                    return
                end
                ngx.say("send success, offset: ", tonumber(offset))

                ngx.say("send success, ok:", ok)
            ';
        }

kafka throws an exception

java.lang.IllegalArgumentException: A metric named 'MetricName [name=throttle-time, group=Produce, description=Tracking average throttle-time per client, tags={client-id=worker:26556}]' already exists, can't register another one

Kafka consumer?

Can lua-resty-kafka be used as a Kafka data consumer, or just a Kafka data producer?

send err:tmp.cn could not be resolved

Server1: ip is 100.100.100.101(something like this)
There are nginx, lua-resty-kafka, interface which will be called using http protocol. The interface will call lua-resty-kafka as producer
Server2: ip is 200.100.100.101(something like this)
Kafka running on server2 in single mode.
Server2 's /etc/hosts content is:
127.0.0.1 localhost
::1 localhost localhost.localdomain localhost6 localhost6.localdomain6
200.100.100.101 tmp.cn

When i run the interface on server1, it will give error send err:tmp.cn could not be resolved
I already use issue#5 for reference to set parameters
On server2, set host.name=127.0.0.1 or host.name=200.100.100.101
and
On server 1, nginx configuration set resolver 200.100.100.101
restart nginx on server1
These methods still can not solve my problem
It still give out this error

I wonder why it need to access domain name of kafka server, only ip address and port is not enough to communicate with kafka server?
Thank you

Getting 'no request' error with producer.

I've attempted to use the Kafka producer to send messages to a Kafka instance on the same local private network. Here's an example of the relevant kafka producer code that I'm using:

_kafka_broker_list = {
  {host = "10.10.250.10", port = 9092}
}
_kafka_producer = _kafka:new(_kafka_broker_list, {producer_type = "async"})

-- Other work gets done here

local kafka_send_ok, kafka_send_err = _kafka_producer:send(list, "nginx", '{"message": "this is a test", "type": "test"}')
if not kafka_send_ok then
   ngx.log(ngx.ERR, "Could not send message to Kafka broker:", kafka_send_err)
   return
end

but unfortunately I get this error when I start nginx:

2017/05/09 18:07:17 [error] 8#8: init_by_lua error: /usr/local/openresty/site/lualib/resty/kafka/producer.lua:282: no request
stack traceback:
	[C]: in function 'timer_at'
	/usr/local/openresty/site/lualib/resty/kafka/producer.lua:282: in function '_flush_buffer'
	/usr/local/openresty/site/lualib/resty/kafka/producer.lua:291: in function '_timer_flush'
	/usr/local/openresty/site/lualib/resty/kafka/producer.lua:333: in function 'new'
	init_by_lua:51: in main chunk

I can't seem to figure out why I'm getting this. Would you have any ideas? Thanks for your help.

producer: how to retry to send message in error_handle method

I mean, how to access the producer instance I created later

local producer_config = {
    producer_type = "async",
    error_handle = function(topic, partition_id, message_queue, index, err, retryable)
        if retryable then
            for i = 1,index then
                local key = message_queue[i * 2 - 1]
                local message = message_queue[i * 2]
                -- how to retry ?
            end
        else
            ngx.log(ngx.ERR, "send to kafka error and retryable is false", err)
        end
    end
}

local bp = producer:new(broker_list, producer_config)
local topic = "nginx_log"

message send success but kafka can not receive

hi ~ @doujiang24:
I have a problem about message send success (sended status),but kafka can not receive message. this problem ocurr after a day or two when starting my nginx. if you use commad "nginx -s reload or restart",this problem will be killed(send message success ,kafka also receive message).
follow is my lua script and nginx configure:
`
local cjson = require "cjson"
local client = require "resty.kafka.client"
local producer = require "resty.kafka.producer"
local broker_list = {
{ host = "192.168.100.29", port = 9092 },
{ host = "192.168.100.30", port = 9092 },
{ host = "192.168.100.31", port = 9092 },
-- { host = "sz-space3.novalocal", port = 9092 },
-- { host = "sz-space4.novalocal", port = 9092 },
-- { host = "sz-space5.novalocal", port = 9092 }
}
local key = os.date("%s")
local body = ngx.req.get_body_data()
local message = body
local cli = client:new(broker_list)
-- local brokers, partitions = cli:fetch_metadata("cleancenter_spaceplus_cleantrack_test_topic")
local brokers, partitions = cli:fetch_metadata("cleancenter_spaceplus_cleantrack_topic0")
if not brokers then
ngx.say("fetch_metadata failed, err:", partitions)
return
end
-- ngx.say("brokers: ", cjson.encode(brokers), "; partitions: ", cjson.encode(partitions))
-- local p = producer:new(broker_list)
-- local offset, err = p:send("cleancenter_spaceplus_cleantrack_topic0", key, message)
-- if not offset then
-- ngx.say("send err:", err)
-- return
-- end
-- ngx.say("send success, offset: ", tonumber(offset))
-- this is async producer_type and bp will be reused in the whole nginx worker
local bp = producer:new(broker_list, { producer_type = "async" })
-- local ok, err = bp:send("cleancenter_spaceplus_cleantrack_test_topic", key, message)
local ok, err = bp:send("cleancenter_spaceplus_cleantrack_topic0", key, message)
if not ok then
ngx.say("send err:", err)
return
end
ngx.log(ngx.ERR,"test","ssss")
ngx.say("send success, ok:", ok)
-- ngx.say("send success, ok:", message)

# spot point log config begin
lua_package_path "/data/softwares/openresty-1.9.15.1/lualib/?.lua;;";
lua_need_request_body on;
server {
#listen port
listen 8080;
server_name 127.0.0.1;
location /clean/track {
content_by_lua_file /data/softwares/openresty-1.9.15.1/nginx/conf/lua/spot_point_log1.lua;
#log_by_lua /data/softwares/openresty-1.9.15.1/nginx/conf/lua/spot_point_log1.lua;
}
}

`

lua-resty-kafka error

Failed to connect to Kafka [lua] client.lua:150: _fetch_metadata(): all brokers failed in fetch topic metadata
return result :fetch_metadata failed, err:not foundd topic
but topic is already created
why?

Getting error when sending message

Hi there,
I'm trying to use this module but get errors like below. Do you have any idea why this is? This "domain-test" topic should be existing on my broker.

2014/12/17 10:37:55 [error] 32594#0: *31 lua entry thread aborted: runtime error: /usr/share/lualib/resty/kafka/request.lua:167: 'for' limit must be a number
stack traceback:
coroutine 0:
        /usr/share/lualib/resty/kafka/request.lua: in function 'message_set'
        /usr/share/lualib/resty/kafka/producer.lua:76: in function 'produce_encode'
        /usr/share/lualib/resty/kafka/producer.lua:248: in function 'send'
        /etc/nginx/lua/info_tests.lua:42: in function </etc/nginx/lua/info_tests.lua:1>, 

with info-tests.lua being something like this (line numbers are not correct):

local cjson = require "cjson"
local producer = require "resty.kafka.producer"

local broker_list = {
  { host = "xxxx", port = xxx },
}

local key = "key"
local message = "halo world"

local bp = producer:new(broker_list, { producer_type = "async" })

local size, err = bp:send("domain-test", key, message)
if not size then
  ngx.say("send err:", err)
  return
end

buffered messages send to kafka err: MessageSizeTooLarge

2016/12/22 22:32:33 [error] 13413#0: *378703 [lua] producer.lua:258: buffered messages send to kafka err: MessageSizeTooLarge, retryable: false, topic: ns-source-wimonitor, partition_id: 2, length: 111, context: ngx.timer, client: 120.237.52.234, server: 0.0.0.0:443

I have got a error from error.log. should i increase the message size? and what should i do?

Errcode and offset field of producer response are both -1

When I connected lua-resty-kafka to 0.9.0.1, the errocode and offset field in the response are both -1. There will be error message in nginx error.log (line 258, producer.lua)

After I change line 153 in producer.lua as follows, everything works fine.
if errcode == 0 or errocode == -1 then

Is there any plan to upgrade the protocol implementation to support Kafka api version 2?

BTW, @doujiang24 great project. Thanks

nginx can not resolve the kafka's hostname ERROR no resolver defined to resolve "hdp5.novalocal"

i use zookeeper to manger the kafka,and the broker information in the zookeeper is like this
[zk: localhost:2181(CONNECTED) 1] get /brokers/ids/1001
{"jmx_port":-1,"timestamp":"1492749719841","endpoints":["PLAINTEXT://hdp5.novalocal:6667"],"host":"hdp5.novalocal","version":3,"port":6667}

when my nginx connect with kafka,I can not connect with kafka.
the error message is
no resolver defined to resolve "hdp5.novalocal"

I try to config my hosts file,and i add ip and namesever ,but it can not work.

how can i resolve this issue?

no resolver defined to resolve

Nginx.conf
ngx.say("[start lua kafka]")
local cjson = require "cjson"
local client = require "resty.kafka.client"
local producer = require "resty.kafka.producer"
local broker_list = {
{ host = "127.0.0.1", port = 9094},
}
local topic = "hellokafka";
local cli = client:new(broker_list)
local brokers, partitions = cli:fetch_metadata(topic)

ngx.say("[fetch_metadata]");
if not brokers then
    ngx.say("fetch_metadata failed, err: ", partitions)

end
ngx.say("brokers: ", cjson.encode(brokers), ";\npartitions: ", cjson.encode(partitions), ";")
ngx.say("[send message]");

local key = nil
local message = "hello nginx kafka lua"
local bp = producer:new(broker_list)
local ok, err = bp:send(topic, nil, message)
if not ok then
    ngx.say("send err:", err)
return
end
ngx.say("send success, ok:", ok)

Output:
[start lua kafka]
[fetch_metadata]
brokers: [null,{"host":"cp01-rdqa-dev340.cp01.baidu.com","port":9094}];
partitions: {"errcode":0,"num":1,"0":{"id":0,"errcode":0,"replicas":[2],"isr":[2],"leader":2}};
[send message]
send err:no resolver defined to resolve "cp01-rdqa-dev340.cp01.baidu.com"

Can't resolve the Hostname? Please Help me...

lua-resty-kafka require error

when i require resty.kafka.producer , the error log show:

2016/08/26 14:10:54 [error] 11222#0: *284 lua entry thread aborted: runtime error: error loading module 'resty.kafka.response' from file '/usr/local/lib/lua/resty/kafka/response.lua':
/usr/local/lib/lua/resty/kafka/response.lua:76: malformed number near '4294967296LL'
stack traceback:
coroutine 0:
[C]: ?
[C]: in function 'require'
/usr/local/lib/lua/resty/kafka/producer.lua:4: in main chunk
[C]: in function 'require'

Manually select Kafka partition

Hi,
Is it possibile to manually select the kafka partition, when sending messages?
I noticed that in the current implementation all messages are sent to partition 1, is it correct?

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.