doujiang24 / lua-resty-kafka Goto Github PK
View Code? Open in Web Editor NEWLua kafka client driver for the Openresty based on the cosocket API
License: BSD 3-Clause "New" or "Revised" License
Lua kafka client driver for the Openresty based on the cosocket API
License: BSD 3-Clause "New" or "Revised" License
Hello! May be you can help me.
I have some producer:
local broker_list = {
{ host = "127.0.0.1", port = 9092 }
}
local p = producer:new(broker_list)
local key = "key"
local mess = "hi"
local offset, err = p:send('test', key, mess)
I get responce:
no resolver defined to resolve "esha"
I searched about nginx resolver, but
not solve my problem.
In my /etc/resolv.conf:
nameserver 127.0.1.1
In my /etc/hosts:
127.0.0.1 localhost
::1 localhost
27.0.1.1 esha
Where "esha" in error? If i have "localhost" in broker list ?
Now, i solve problem use "stub" in /producer.lua:
129 local config = brokers[leader]
130
131 config.host = "127.0.0.1"
132 local bk = broker:new(config.host, config.port, self.socket_config)
133 self.producer_brokers[leader] = bk
134
135 return bk
What would you recommend?
-- this is async producer_type and bp will be reused in the whole nginx worker
local bp = producer:new(broker_list, { producer_type = "async" })
local ok, err = p:send("test", key, message)
p == bp
this should be
bp:send("test", key, message)
Hi
Can this driver run on the phases other than 'content_by_lua'? If it can only run on 'content_by_lua', this will cause a conflict with proxy_pass which is also run on the 'content phase' of nginx.
How to make this kafka producer work well when I have an upstream configuration?
My main purpose is to output upstream_response_time to kafka.
Thank you
In the readme it is written that it is still in the finalization but the inscriptions are 3 years old.
2016/07/26 20:54:58 [error] 17646#0: *34 connect() failed (113: No route to host), client: 192.168.1.111, server: 192.168.1.160, request: "GET /meta HTTP/1.1", host: "192.168.1.160:4002"
2016/07/26 20:54:58 [error] 17646#0: *34 [lua] client.lua:209: _fetch_metadata(): all brokers failed in fetch topic metadata, client: 192.168.1.111, server: 192.168.1.160, request: "GET /meta HTTP/1.1", host: "192.168.1.160:4002"
2016/07/26 20:54:58 [error] 17646#0: *34 lua entry thread aborted: runtime error: content_by_lua(nginx.conf:71):16: attempt to index local 'brokers' (a nil value)
stack traceback:
coroutine 0:
content_by_lua(nginx.conf:71): in function <content_by_lua(nginx.conf:71):1>, client: 192.168.1.111, server: 192.168.1.160, request: "GET /meta HTTP/1.1", host: "192.168.1.160:4002"
I am using async kafka producer in our lua code, and we recently got situation than one of the kafka machines when down. Unfortunately, async client was continiously failing in this case, and I guess it was metadata request failing
The way we configure a kafka cluster in our lua config:
local config = {
["kafka"]={
{ host = "down", port = 9092 },
{ host = "foo", port = 9092 },
{ host = "bar", port = 9092 },
},
}
Host 'down' obviously went down during incident. Others were up. All hosts are configured by IP addresses, not the hostnames
The way we use async producer:
....
local function send_messages_to_kafka(messages)
local error_handle = function (topic, partition_id, queue, index, err, retryable)
ngx.log(ngx.ERR, "failed to send to kafka: " .. err)
end
local producer_kafka, err = kafkaproducer:new(config.kafka, { producer_type = "async", refresh_interval = 5000, error_handle = error_handle })
.....
And the result we observed in logs:
2016/04/24 12:04:06 [error] 22322#0: lua tcp socket connect timed out, context: ngx.timer, client: some.client.ip.here, server: 0.0.0.0:80
2016/04/24 12:04:06 [error] 22322#0: [lua] dataeye_logging_helper.lua:12: failed to send to kafka: timeout, context: ngx.timer, client: some.client.ip.here, server: 0.0.0.0:80
So my question here - is there any good workaround for this problem? I can't really understand from the resty-kafka code if it performs fallback across multiple kafka instances
I would like to ask if the lua-resty-kafka supports the transfer file
How to set up linger: ms value?
I added the request handler as example in README. However, fetch_metadata() seemed to return empty result which caused the error.
local cli = client:new(broker_list)
local brokers, partitions = cli:fetch_metadata("exp_khuang")
ngx.say("brokers: ", cjson.encode(brokers), "; partitions: ", cjson.encode(partitions)) -- Error thrown here.
I noticed the traceback is like client:fetch_metadata->broker:send_receive(). I added some dump in send_receive(). The request:packet() sent to kafka broker node is "true", and received data is nil. I am not sure where it goes wrong. Could anyone kindly suggest? Thanks a lot!
I'm try to send byte array to kafka.
local ok, err = bp:send("m5", key, msg_bytes)
and msg_bytes is byte table.
But there throws an error.
/lua-resty-kafka-master/lib/resty/kafka/request.lua:158: invalid value (table) at index 6 in table for 'concat'
Then i checked the request.lua ,and i found
local req = {
-- MagicByte
str_int8(0),
-- XX hard code no Compression
str_int8(0),
str_int32(key_len),
key,
str_int32(len),
msg,
}
local str = concat(req)
Seems i can't send bytes to kafka,because the client needs to concat the key and the message.
Is there any way i can send the bytes to kakfa directly , not encoding them with Base64?
I used hostname ,find error is
[error] 13484#0: *105 [lua] client.lua:150: _fetch_metadata(): all brokers failed in fetch topic metadata, client: 127.0.0.1, server: localhost, request: "GET /test HTTP/1.1", host: "localhost"
local broker_list = {
{ host = "storm01", port = 9092 }
}
I hope you can help me .
local producer = require "resty.kafka.producer"
local broker_list = {
{ host = "10.7.13.112", port = 9092 },
{ host = "10.7.13.114", port = 9092 },
{ host = "10.7.13.115", port = 9092 },
}
ngx.req.read_body()
local post = ngx.req.get_body_data()
local bp = producer:new(broker_list, { producer_type = "async" })
if post then
local ok, err = bp:send("footPrint_eventlog", nil,post)
if not ok then
ngx.log(ngx.ERR, "kafka send err:", err)
return
end
ngx.say("{"code":200,"description":"ok"}")
else
ngx.say("{"code":400,"description":"no content"}")
end
kafka 发送post数据成功 接受到消息 中文有乱码 post通过ngx.log 输出文件没有乱码 求问大神什么原因
return:
brokers: [{"host":"localhost","port":9092}]; partitions: {"0":{"id":0,"errcode":0,"replicas":[1],"isr":[1],"leader":1},"1":{"id":1,"errcode":0,"replicas":[1],"isr":[1],"leader":1},"errcode":0,"num":2}
send err1:no resolver defined to resolve "localhost"
code:
local cjson = require "cjson"
local client = require "resty.kafka.client"
local producer = require "resty.kafka.producer"
local broker_list = {
--{ host = "192.168.1.1", port = 9092 },
{ host = "127.0.0.1", port = 9092 }
}
local key = "key"
local message = "halo world"
-- usually we do not use this library directly
local cli = client:new(broker_list)
local brokers, partitions = cli:fetch_metadata("test")
if not brokers then
ngx.say("fetch_metadata failed, err:", partitions)
end
ngx.say("brokers: ", cjson.encode(brokers), "; partitions: ", cjson.encode(partitions))
-- sync producer_type
local p = producer:new(broker_list)
local offset, err = p:send("test", key, message)
if not offset then
ngx.say("send err1:", err)
return
end
ngx.say("send success, offset: ", tonumber(offset))
How can I solve it?
this is my code , the result show: send success, offset: true
but in customer i can't get anything;
location ^~/1 { default_type 'text/html'; access_log off; access_by_lua ' local producer = require "resty.kafka.producer" local broker_list = { { host = "192.168.210.24", port = 6667 }, { host = "192.168.210.25", port = 6667 }, } local producer_config = {producer_type = "async"} local p = producer:new(broker_list , producer_config) local offset, err = p:send("usertrajectory" , nil , "1111111111111111111112222222222" ) if not offset then ngx.say("send err:", err) return end ngx.say("send success, offset: ", offset) return '; }
Hi,
I tried you producer example, but it's not working. I tried to send a message, but kafka doesn't receive it.
LUA gives me no errors, and both the "err" and the "resp" variable are nil.
I'm sure I can connect to Kafka from that machine because I tested the connection with kafkacat and the sending of messages via CLI works.
Hi - We are running a portion of our network through OpenResty / Kafka using your driver, in order to test it out. We are producing about 100-200 messages per second onto Kafka.
When we set producer_type = async
, memory utilization increases until the following error is output to our logs:
*emergency(): Unable to connect to kafka: buffer overflow
Can you please help us understand what is causing this memory leak? Thanks!
if i use content_by_lua_file,send message to kafka0.9.0.2.3 is ok,but use log_by_lua_file meet err
nginx error log:
[error] 30952#0: *30248161 lua tcp socket read timed out, context: ngx.timer, client: xxxx, server: xxxx
[error] 30952#0: *30248161 [lua] producer.lua:258: buffered messages send to kafka err: timeout, retryable: nil, topic:xxxx, partition_id: 1, length: 1, context: ngx.timer, client: xxxx, server: xxxx
but it did not happen frequently. kafka works fine and everything is OK, I have set request_timeout 10000ms like this:
local bp = producer:new(broker_list, { producer_type = "async", request_timeout = 10000 })
but it seems to be not work.
could you give me a favor?
Hi,
Not sure why I'm getting this error and the topic do exist. I also tested it through client python with the same topic and that worked.
....
....
local broker_list = {
{ host = "127.0.0.1", port = 9092 },
}
local key = "key1"
local message = "hello world"
local p = producer:new(broker_list)
local offset, err = p:send("test", key,message)
if not offset then
ngx.say("send err:", err)
return
end
ngx.say("send success, offset: ", tostring(offset))
Using:
Openresty 1.7.2.1
kafka server 2.10-0.8.2.0
Also, I tested on 0.8.1 and 0.8.1.1
Thanks,
HI,ALL:
when I restart my kafka
cluster, My resty kafka product will can't send message. I mush reload my nginx then it will be OK . How to do resty kafka product
,so it will be automatically check the kafka cluser .
producer.lua:258: buffered messages send to kafka err: not found broker, retryable: true, topic: test, partition_id: 0, length: 219, context: ngx.timer
local cli = client:new(broker_list)
local brokers, partitions = cli:fetch_metadata(kafka.topic)
if not brokers then
ngx.log(ngx.ERR, "fetch_metadata failed, err:", partitions)
end
ngx.log(ngx.ERR, "brokers: ", #brokers, "; partitions: ", cjson.encode(partitions))
I use the code above in my macos environment and I can ensure the broker_list is available and is it bug to just return a empty brokers? but partitions info is ok
We use the lua-resty-kafka to send our log to the kafka.
The qps is 6K+ , size per request is 0.6K.
However we see many buffer overflow errors in the errlog.
Andr i found this error in ringbuffer.lua .
function _M.add(self, topic, key, message)
local num = self.num
local size = self.size
if num >= size then
return nil, "buffer overflow"
end
local index = (self.start + num) % size
local queue = self.queue
queue[index] = topic
queue[index + 1] = key
queue[index + 2] = message
self.num = num + 3
return true
end
What config should i set?
And what does this error mean?
In the readme example, a broker is instantiated in the scope of a location. Is it possible to do the require statement in an init_by_lua_block? I have the same question for the bp?
So will the following work as expected, or do you believe it not to be thread-safe ?
http {
lua_package_path "/data/programs/lua-resty-kafka/lib/?.lua;;";
init_by_lua_block {
cjson = require("cjson")
local kafka_broker_list = {{host = "kafka1.mydomain.com", port = 9092}}
local kafka_producer_config = {producer_type = "async"}
-- can this be instantiated here ?
local producer = require "resty.kafka.producer"
-- edited this line: made bp global VM level
bp = producer:new(kafka_broker_list, kafka_producer_config)
}
server {
location /test {
content_by_lua_block {
local args, err = ngx.req.get_uri_args()
local message = {
test_arg1 = args.test_arg1 or nil
}
message = cjson.encode(message)
local ok, err = bp:send("test_topic", "test_key", message)
if not ok then
ngx.say("send err:", err)
return
end
ngx.say("send success, ok:", ok)
}
}
}
}
1 部署环境
openresty-1.9.15.1.tar.gz + 最新lua-resty-kafka + CDH5.4.0(kafka1.3.0)+ CentOS6.4 64位
2 lua-resty-kafka相应的nginx配置文件内容
server {
listen 8081;
server_name 192.168.15.84;
location /test {
content_by_lua '
local cjson = require "cjson"
local client = require "resty.kafka.client"
local producer = require "resty.kafka.producer"
local broker_list = {
{ host = "192.168.15.81", port = 9092 },
{ host = "192.168.15.82", port = 9092 },
{ host = "192.168.15.83", port = 9092 }
}
local key = "key"
local message = "halo world"
-- usually we do not use this library directly
local cli = client:new(broker_list)
local brokers, partitions = cli:fetch_metadata("test")
if not brokers then
ngx.say("fetch_metadata failed, err:", partitions)
end
ngx.say("brokers: ", cjson.encode(brokers), "; partitions: ", cjson.encode(partitions))
-- sync producer_type
local p = producer:new(broker_list)
local offset, err = p:send("test", key, message)
if not offset then
ngx.say("send err:", err)
return
end
ngx.say("send success, offset: ", tonumber(offset))
-- this is async producer_type and bp will be reused in the whole nginx worker
local bp = producer:new(broker_list, { producer_type = "async" })
local ok, err = bp:send("test", key, message)
if not ok then
ngx.say("send err:", err)
return
end
ngx.say("send success, ok:", ok)
';
}
}
3 请求例子
curl -X POST -H ' charset=UTF-8' -d 'sssss' http://192.168.15.84:8081/test
4 出现的问题(数据不能写入kafka),日志详情如下:
2016/06/21 18:10:30 [error] 22381#0: *1 lua entry thread aborted: runtime error: content_by_lua(lua-kafka.conf:50):21: Cannot serialise table: excessively sparse array
stack traceback:
coroutine 0:
[C]: in function 'encode'
content_by_lua(lua-kafka.conf:50):21: in function <content_by_lua(lua-kafka.conf:50):1>, client: 192.168.15.84, server: 192.168.15.84, request: "POST /test HTTP/1.1", host: "192.168.15.84:8081"
5 非常感谢,期待您的回答。
I have multi route in nginx config, each route register a kafka topic to produce message into each kafka cluster. The problem I have come across is that when I add a new route to send one topic into a new kafka cluster with the producer_type = "async"
parameter, I saw the new topic messages still send into the kafka cluster I have configiured in the old route. I solve this problem by removing the producer_type = "async"
parameter.
May this will help.
Hi,
Could you provide a little example about how to install and use the library?
if I send fast, the error log:
[lua] producer.lua:280: _flush_buffer(): failed to create timer at _flush_buffer, err: too many pending timers.
But I use the ngx.timer.pending_count() and ngx.timer.running_count() function watching the pending and running value. The pending value max is 2 and the running value max is 1.
This is my nginx.conf setting:
lua_max_pending_timers 1024;
lua_max_running_timers 256;
Thanks!
2015/08/14 15:03:34 [error] 17858#0: *122 failed to run log_by_lua*: error loading module 'resty.kafka.response' from file '/usr/local/sh
1306 /usr/local/share/lua/5.1/resty/kafka/response.lua:76: malformed number near '4294967296LL'
1307 stack traceback:
1308 [C]: ?
1309 [C]: in function 'require'
1310 /usr/local/share/lua/5.1/resty/kafka/producer.lua:4: in main chunk
1311 [C]: in function 'require'
1312 /etc/nginx/lua/log/log_into_kafka.lua:1: in function </etc/nginx/lua/log/log_into_kafka.lua:1> while logging request, client: 14.153.
1313 2015/08/14 15:03:35 [error] 17858#0: *122 failed to run log_by_lua*: /etc/nginx/lua/log/log_into_kafka.lua:1: loop or previous error load
1314 stack traceback:
1315 [C]: in function 'require'
I run the following code:
local p = producer:new(broker_list)
local offset, err = p:send(topic, key, body)
if not offset then
ngx.log(ngx.ERR, err)
return ngx.exit(ngx.HTTP_INTERNAL_SERVER_ERROR)
end
I've got some 500 error, but when i log the error i don't have anything either.
2017/01/20 16:33:02 [error] 10#10: *719 [lua] script.lua:33: , client: 10.10.10.10, server: api.example.com, request: "POST /some/path HTTP/1.1", host: "api.example.com"
I can't figure out how to debug this. any idea ?
The error log is:
2016/07/27 14:02:56 [error] 7253#0: *25 recv() failed (104: Connection reset by peer), client: 192.168.1.111, server: 192.168.1.160, request: "GET /collect HTTP/1.1", host: "192.168.1.160:4002"
and the config is:
location /collect {
lua_need_request_body on;
client_max_body_size 5M;
client_body_buffer_size 256k;
default_type application/json;
content_by_lua '
local cjson = require "cjson"
local client = require "resty.kafka.client"
local producer = require "resty.kafka.producer"
local broker_list = {
{ host = "192.168.1.151", port = 9092 }
}
local key = "key"
local message = "halo world"
-- sync producer_type
local p = producer:new(broker_list)
local offset, err = p:send("demo2", key, message)
if not offset then
ngx.say("send err:", err)
return
end
ngx.say("send success, offset: ", tonumber(offset))
ngx.say("send success, ok:", ok)
';
}
java.lang.IllegalArgumentException: A metric named 'MetricName [name=throttle-time, group=Produce, description=Tracking average throttle-time per client, tags={client-id=worker:26556}]' already exists, can't register another one
Can lua-resty-kafka be used as a Kafka data consumer, or just a Kafka data producer?
hello
thank you for your provide this tool.
I met a problem
How to set up the topic of the partition number and replication number
Really looking forward to your answer
Server1: ip is 100.100.100.101(something like this)
There are nginx, lua-resty-kafka, interface which will be called using http protocol. The interface will call lua-resty-kafka as producer
Server2: ip is 200.100.100.101(something like this)
Kafka running on server2 in single mode.
Server2 's /etc/hosts content is:
127.0.0.1 localhost
::1 localhost localhost.localdomain localhost6 localhost6.localdomain6
200.100.100.101 tmp.cn
When i run the interface on server1, it will give error send err:tmp.cn could not be resolved
I already use issue#5 for reference to set parameters
On server2, set host.name=127.0.0.1 or host.name=200.100.100.101
and
On server 1, nginx configuration set resolver 200.100.100.101
restart nginx on server1
These methods still can not solve my problem
It still give out this error
I wonder why it need to access domain name of kafka server, only ip address and port is not enough to communicate with kafka server?
Thank you
I've attempted to use the Kafka producer to send messages to a Kafka instance on the same local private network. Here's an example of the relevant kafka producer code that I'm using:
_kafka_broker_list = {
{host = "10.10.250.10", port = 9092}
}
_kafka_producer = _kafka:new(_kafka_broker_list, {producer_type = "async"})
-- Other work gets done here
local kafka_send_ok, kafka_send_err = _kafka_producer:send(list, "nginx", '{"message": "this is a test", "type": "test"}')
if not kafka_send_ok then
ngx.log(ngx.ERR, "Could not send message to Kafka broker:", kafka_send_err)
return
end
but unfortunately I get this error when I start nginx:
2017/05/09 18:07:17 [error] 8#8: init_by_lua error: /usr/local/openresty/site/lualib/resty/kafka/producer.lua:282: no request
stack traceback:
[C]: in function 'timer_at'
/usr/local/openresty/site/lualib/resty/kafka/producer.lua:282: in function '_flush_buffer'
/usr/local/openresty/site/lualib/resty/kafka/producer.lua:291: in function '_timer_flush'
/usr/local/openresty/site/lualib/resty/kafka/producer.lua:333: in function 'new'
init_by_lua:51: in main chunk
I can't seem to figure out why I'm getting this. Would you have any ideas? Thanks for your help.
I mean, how to access the producer instance I created later
local producer_config = {
producer_type = "async",
error_handle = function(topic, partition_id, message_queue, index, err, retryable)
if retryable then
for i = 1,index then
local key = message_queue[i * 2 - 1]
local message = message_queue[i * 2]
-- how to retry ?
end
else
ngx.log(ngx.ERR, "send to kafka error and retryable is false", err)
end
end
}
local bp = producer:new(broker_list, producer_config)
local topic = "nginx_log"
hi ~ @doujiang24:
I have a problem about message send success (sended status),but kafka can not receive message. this problem ocurr after a day or two when starting my nginx. if you use commad "nginx -s reload or restart",this problem will be killed(send message success ,kafka also receive message).
follow is my lua script and nginx configure:
`
local cjson = require "cjson"
local client = require "resty.kafka.client"
local producer = require "resty.kafka.producer"
local broker_list = {
{ host = "192.168.100.29", port = 9092 },
{ host = "192.168.100.30", port = 9092 },
{ host = "192.168.100.31", port = 9092 },
-- { host = "sz-space3.novalocal", port = 9092 },
-- { host = "sz-space4.novalocal", port = 9092 },
-- { host = "sz-space5.novalocal", port = 9092 }
}
local key = os.date("%s")
local body = ngx.req.get_body_data()
local message = body
local cli = client:new(broker_list)
-- local brokers, partitions = cli:fetch_metadata("cleancenter_spaceplus_cleantrack_test_topic")
local brokers, partitions = cli:fetch_metadata("cleancenter_spaceplus_cleantrack_topic0")
if not brokers then
ngx.say("fetch_metadata failed, err:", partitions)
return
end
-- ngx.say("brokers: ", cjson.encode(brokers), "; partitions: ", cjson.encode(partitions))
-- local p = producer:new(broker_list)
-- local offset, err = p:send("cleancenter_spaceplus_cleantrack_topic0", key, message)
-- if not offset then
-- ngx.say("send err:", err)
-- return
-- end
-- ngx.say("send success, offset: ", tonumber(offset))
-- this is async producer_type and bp will be reused in the whole nginx worker
local bp = producer:new(broker_list, { producer_type = "async" })
-- local ok, err = bp:send("cleancenter_spaceplus_cleantrack_test_topic", key, message)
local ok, err = bp:send("cleancenter_spaceplus_cleantrack_topic0", key, message)
if not ok then
ngx.say("send err:", err)
return
end
ngx.log(ngx.ERR,"test","ssss")
ngx.say("send success, ok:", ok)
-- ngx.say("send success, ok:", message)
# spot point log config begin
lua_package_path "/data/softwares/openresty-1.9.15.1/lualib/?.lua;;";
lua_need_request_body on;
server {
#listen port
listen 8080;
server_name 127.0.0.1;
location /clean/track {
content_by_lua_file /data/softwares/openresty-1.9.15.1/nginx/conf/lua/spot_point_log1.lua;
#log_by_lua /data/softwares/openresty-1.9.15.1/nginx/conf/lua/spot_point_log1.lua;
}
}
`
Failed to connect to Kafka [lua] client.lua:150: _fetch_metadata(): all brokers failed in fetch topic metadata
return result :fetch_metadata failed, err:not foundd topic
but topic is already created
why?
Hi there,
I'm trying to use this module but get errors like below. Do you have any idea why this is? This "domain-test" topic should be existing on my broker.
2014/12/17 10:37:55 [error] 32594#0: *31 lua entry thread aborted: runtime error: /usr/share/lualib/resty/kafka/request.lua:167: 'for' limit must be a number
stack traceback:
coroutine 0:
/usr/share/lualib/resty/kafka/request.lua: in function 'message_set'
/usr/share/lualib/resty/kafka/producer.lua:76: in function 'produce_encode'
/usr/share/lualib/resty/kafka/producer.lua:248: in function 'send'
/etc/nginx/lua/info_tests.lua:42: in function </etc/nginx/lua/info_tests.lua:1>,
with info-tests.lua being something like this (line numbers are not correct):
local cjson = require "cjson"
local producer = require "resty.kafka.producer"
local broker_list = {
{ host = "xxxx", port = xxx },
}
local key = "key"
local message = "halo world"
local bp = producer:new(broker_list, { producer_type = "async" })
local size, err = bp:send("domain-test", key, message)
if not size then
ngx.say("send err:", err)
return
end
2016/12/22 22:32:33 [error] 13413#0: *378703 [lua] producer.lua:258: buffered messages send to kafka err: MessageSizeTooLarge, retryable: false, topic: ns-source-wimonitor, partition_id: 2, length: 111, context: ngx.timer, client: 120.237.52.234, server: 0.0.0.0:443
I have got a error from error.log. should i increase the message size? and what should i do?
When I connected lua-resty-kafka to 0.9.0.1, the errocode and offset field in the response are both -1. There will be error message in nginx error.log (line 258, producer.lua)
After I change line 153 in producer.lua as follows, everything works fine.
if errcode == 0 or errocode == -1 then
Is there any plan to upgrade the protocol implementation to support Kafka api version 2?
BTW, @doujiang24 great project. Thanks
i use zookeeper to manger the kafka,and the broker information in the zookeeper is like this
[zk: localhost:2181(CONNECTED) 1] get /brokers/ids/1001
{"jmx_port":-1,"timestamp":"1492749719841","endpoints":["PLAINTEXT://hdp5.novalocal:6667"],"host":"hdp5.novalocal","version":3,"port":6667}
when my nginx connect with kafka,I can not connect with kafka.
the error message is
no resolver defined to resolve "hdp5.novalocal"
I try to config my hosts file,and i add ip and namesever ,but it can not work.
how can i resolve this issue?
Nginx.conf
ngx.say("[start lua kafka]")
local cjson = require "cjson"
local client = require "resty.kafka.client"
local producer = require "resty.kafka.producer"
local broker_list = {
{ host = "127.0.0.1", port = 9094},
}
local topic = "hellokafka";
local cli = client:new(broker_list)
local brokers, partitions = cli:fetch_metadata(topic)
ngx.say("[fetch_metadata]");
if not brokers then
ngx.say("fetch_metadata failed, err: ", partitions)
end
ngx.say("brokers: ", cjson.encode(brokers), ";\npartitions: ", cjson.encode(partitions), ";")
ngx.say("[send message]");
local key = nil
local message = "hello nginx kafka lua"
local bp = producer:new(broker_list)
local ok, err = bp:send(topic, nil, message)
if not ok then
ngx.say("send err:", err)
return
end
ngx.say("send success, ok:", ok)
Output:
[start lua kafka]
[fetch_metadata]
brokers: [null,{"host":"cp01-rdqa-dev340.cp01.baidu.com","port":9094}];
partitions: {"errcode":0,"num":1,"0":{"id":0,"errcode":0,"replicas":[2],"isr":[2],"leader":2}};
[send message]
send err:no resolver defined to resolve "cp01-rdqa-dev340.cp01.baidu.com"
Can't resolve the Hostname? Please Help me...
RT
when i require resty.kafka.producer , the error log show:
2016/08/26 14:10:54 [error] 11222#0: *284 lua entry thread aborted: runtime error: error loading module 'resty.kafka.response' from file '/usr/local/lib/lua/resty/kafka/response.lua':
/usr/local/lib/lua/resty/kafka/response.lua:76: malformed number near '4294967296LL'
stack traceback:
coroutine 0:
[C]: ?
[C]: in function 'require'
/usr/local/lib/lua/resty/kafka/producer.lua:4: in main chunk
[C]: in function 'require'
Hi,
Is it possibile to manually select the kafka partition, when sending messages?
I noticed that in the current implementation all messages are sent to partition 1, is it correct?
is there any plan to support ssl?
How to connect to zookeeper?
A declarative, efficient, and flexible JavaScript library for building user interfaces.
🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
An Open Source Machine Learning Framework for Everyone
The Web framework for perfectionists with deadlines.
A PHP framework for web artisans
Bring data to life with SVG, Canvas and HTML. 📊📈🎉
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
Some thing interesting about web. New door for the world.
A server is a program made to process requests and deliver data to clients.
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
Some thing interesting about visualization, use data art
Some thing interesting about game, make everyone happy.
We are working to build community through open source technology. NB: members must have two-factor auth.
Open source projects and samples from Microsoft.
Google ❤️ Open Source for everyone.
Alibaba Open Source for everyone
Data-Driven Documents codes.
China tencent open source team.