youzan / nsq Goto Github PK
View Code? Open in Web Editor NEWA realtime distributed messaging platform (forked from https://github.com/nsqio/nsq)
Home Page: http://nsq.io/
License: MIT License
A realtime distributed messaging platform (forked from https://github.com/nsqio/nsq)
Home Page: http://nsq.io/
License: MIT License
flume_param.go:getLocalIP failed.
Get IPAddress error: lookup iZuf6074m10zsynrhd1yv0Z on 100.100.2.138:53: no such host
Log file created at: [/tmp]
用的二进制包运行的 。
单机环境的时候启动失败
错误如下
root@iZbp12m3z176dptz83k3i2Z:/usr/local/nsq# ./bin/nsqlookupd -config=./etc/nsqlookupd.conf
Log file created at: [/usr/local/nsq/log /tmp]
panic: runtime error: invalid memory address or nil pointer dereference
[signal SIGSEGV: segmentation violation code=0x1 addr=0x10 pc=0xa2c7fb]
goroutine 24 [running]:
github.com/absolute8511/xlock2.(*EtcdClient).Watch(0x0, 0xc4201c5c00, 0x1a, 0x0, 0x1, 0x0, 0x0)
/Users/vincentlee/workspace/go/src/github.com/absolute8511/xlock2/etcd_client.go:121 +0x4b
github.com/youzan/nsq/consistence.(*NsqLookupdEtcdMgr).watchTopics(0xc42013db00)
/Users/vincentlee/workspace/go/src/github.com/youzan/nsq/consistence/nsq_lookupd_etcd.go:376 +0x67
created by github.com/youzan/nsq/consistence.(*NsqLookupdEtcdMgr).InitClusterID
/Users/vincentlee/workspace/go/src/github.com/youzan/nsq/consistence/nsq_lookupd_etcd.go:86 +0x127
配置文件:
## <addr>:<port> to listen on for TCP clients
tcp_address = "0.0.0.0:4160"
## <addr>:<port> to listen on for HTTP clients
http_address = "0.0.0.0:4161"
## rpc port used for cluster communication
rpc_port = "4260"
## address that will be registered with lookupd (defaults to the OS hostname)
#broadcast_address = ""
## the network interface for broadcast, the ip will be detected automatically.
# use this configure instead of broadcast_address to keep all the configure is the same
broadcast_interface = "eth0"
## local reverse proxy port, basically used for collecting the stats
# reverse_proxy_port = "4163"
cluster_id = "test-nsq"
## the etcd cluster ip list
cluster_leadership_addresses = "127.0.0.1:2379"
## duration of time a producer will remain in the active list since its last ping
inactive_producer_timeout = "100s"
# should at least twice as the ping interval on nsqd
nsqd_ping_timeout= "15s"
## duration of time a producer will remain tombstoned if registration remains
tombstone_lifetime = "45s"
## the detail of the log, larger number means more details
log_level = 2
## if empty, use the default flag value in glog
log_dir = "/usr/local/nsq/log"
## the time period (in hour) that the balance is allowed.
balance_interval = ["4", "5"]
## allow return topic as writable while no any channel under the topic
allow_write_with_nochannels = true
貌似是读取etcd的时候发生错误.
但是我本地使用etcdctl 操作是可以的.
etcd的版本:3.2.9
我用docker-compose启动,nsqlookupd,nsqd都起来了,nsqadmin的时候没有任何错误就退出 了
log 输出只有这些
Log file created at: 2017/10/30 16:26:25
Running on machine: 5d7074420ede
Binary: Built with gc go1.9.1 for linux/amd64
Log line format: [IWEF]mmdd hh:mm:ss.uuuuuu threadid file:line] msg
下面是我的配置,
docker-compose.yml
####### nslookup #######
nsqlookupd:
image: nsqio/nsq:v0.3.7-HA.1.6.0
volumes:
- ~/project/nsq/data:/data
ports:
- 4160:4160
- 4161:4161
command: /nsqlookupd -config /data/nsqlookupd.toml
networks:
- backend
####### nslookup #######
nsqd:
image: nsqio/nsq:v0.3.7-HA.1.6.0
volumes:
- ~/project/nsq/data:/data
ports:
- 4150:4150
- 4151:4151
command: /nsqd -config /data/nsqd.toml
networks:
- backend
####### nslookup #######
nsqadmin:
image: nsqio/nsq:v0.3.7-HA.1.6.0
volumes:
- ~/project/nsq/data:/data
ports:
- 4170:4170
- 4171:4171
command: /nsqadmin -config /data/nsqadmin.toml
networks:
- backend
nsqlookupd.toml
## <addr>:<port> to listen on for TCP clients
tcp_address = "0.0.0.0:4160"
## <addr>:<port> to listen on for HTTP clients
http_address = "0.0.0.0:4161"
## rpc port used for cluster communication
rpc_port = "4260"
## address that will be registered with lookupd (defaults to the OS hostname)
#broadcast_address = ""
## the network interface for broadcast, the ip will be detected automatically.
# use this configure instead of broadcast_address to keep all the configure is the same
broadcast_interface = "eth0"
## local reverse proxy port, basically used for collecting the stats
# reverse_proxy_port = "4163"
cluster_id = "test-nsq-cluster-dev-1"
## the etcd cluster ip list
cluster_leadership_addresses = "http://etcd0:2379,http://etcd1:2379,http://etcd2:2379"
## duration of time a producer will remain in the active list since its last ping
inactive_producer_timeout = "100s"
# should at least twice as the ping interval on nsqd
nsqd_ping_timeout= "15s"
## duration of time a producer will remain tombstoned if registration remains
tombstone_lifetime = "45s"
## the detail of the log, larger number means more details
log_level = 2
## if empty, use the default flag value in glog
log_dir = "./"
## the time period (in hour) that the balance is allowed.
balance_interval = ["4", "5"]
## allow return topic as writable while no any channel under the topic
allow_write_with_nochannels = true
nsqd.toml
## unique identifier (int) for this worker (will default to a hash of hostname)
# id = 5150
#
## cluster id to separate different cluster
cluster_id = "test-nsq-cluster-dev-1"
## the etcd cluster ip list
cluster_leadership_addresses = "http://etcd0:2379,http://etcd1:2379,http://etcd2:2379"
## rpc port used for node communication for cluster , ip will be the same with broadcast
rpc_port = "4250"
## address that will be registered with lookupd (defaults to the OS hostname)
#broadcast_address = ""
broadcast_interface = "eth0"
## <addr>:<port> to listen on for TCP clients
tcp_address = "0.0.0.0:4150"
## <addr>:<port> to listen on for HTTP clients
http_address = "0.0.0.0:4151"
## <addr>:<port> to listen on for HTTPS clients
# https_address = "0.0.0.0:4152"
## local reverse proxy port, basically used for collecting the stats
# reverse_proxy_port = "4153"
## keep alive heartbeat interval between the nsqd node with the nsqlookupd
lookup_ping_interval = "5s"
## path to store disk-backed messages
# data_path = "/var/lib/nsq"
## the remote message trace server
# remote_tracer = "127.0.0.1:1234"
## default retention days to keep the consumed topic data
retention_days = 7
## number of messages to keep in memory (per topic/channel)
mem_queue_size = 10000
## number of bytes per diskqueue file before rolling
max_bytes_per_file = 104857600
## number of messages per diskqueue fsync
sync_every = 2500
## duration of time per diskqueue fsync (time.Duration)
sync_timeout = "2s"
## duration to wait before auto-requeing a message
msg_timeout = "60s"
## maximum duration before a message will timeout
max_msg_timeout = "15m"
## maximum size of a single message in bytes
max_msg_size = 1024768
## maximum requeuing timeout for a message
max_req_timeout = "24h"
## duration threshold for requeue a message to the delayed queue end
req_to_end_threshold = "15m"
## maximum size of a single command body
max_body_size = 5123840
## maximum finished count with unordered
max_confirm_win = 5000
## maximum client configurable duration of time between client heartbeats
max_heartbeat_interval = "60s"
## maximum RDY count for a client
max_rdy_count = 2500
## maximum client configurable size (in bytes) for a client output buffer
max_output_buffer_size = 65536
## maximum client configurable duration of time between flushing to a client (time.Duration)
max_output_buffer_timeout = "1s"
## UDP <addr>:<port> of a statsd daemon for pushing stats
# statsd_address = "127.0.0.1:8125"
## prefix used for keys sent to statsd (%s for host replacement)
statsd_prefix = "nsq.%s"
## the protocol for statsd
statsd_protocol="tcp"
## duration between pushing to statsd (time.Duration)
statsd_interval = "60s"
## toggle sending memory and GC stats to statsd
statsd_mem_stats = true
## message processing time percentiles to keep track of (float)
e2e_processing_latency_percentiles = [
100.0,
99.0,
95.0
]
## calculate end to end latency quantiles for this duration of time (time.Duration)
e2e_processing_latency_window_time = "10m"
## path to certificate file
tls_cert = ""
## path to private key file
tls_key = ""
## set policy on client certificate (require - client must provide certificate,
## require-verify - client must provide verifiable signed certificate)
# tls_client_auth_policy = "require-verify"
## set custom root Certificate Authority
# tls_root_ca_file = ""
## require client TLS upgrades
tls_required = false
## minimum TLS version ("ssl3.0", "tls1.0," "tls1.1", "tls1.2")
tls_min_version = ""
## enable deflate feature negotiation (client compression)
deflate = true
## max deflate compression level a client can negotiate (> values == > nsqd CPU usage)
max_deflate_level = 6
## enable snappy feature negotiation (client compression)
snappy = true
## bigger level means more log details
log_level = 2
## if empty, use the default flag value in glog
log_dir = "./"
## the time period (in hour) that the auto clean is allowed.
auto_clean_interval = ["2", "4"]
## whether we should fix the data if only one ISR is available
start_as_fix_mode = true
## the interval for scan for channel timeout messages
queue_scan_interval = "100ms"
## selection channel count for each timeout scan
queue_scan_selection_count = 100
## max timeout scan worker
queue_scan_worker_pool_max = 8
## re-scan trigger if there is dirty percent channels timeout
queue_scan_dirty_percent = 0.1
nsqadmin.toml
## <addr>:<port> to listen on for HTTP clients
http_address = ":4171"
## graphite HTTP address
graphite_url = ""
## proxy HTTP requests to graphite
proxy_graphite = false
## prefix used for keys sent to statsd (%s for host replacement, must match nsqd)
statsd_prefix = "nsq.%s"
## format of statsd counter stats
statsd_counter_format="%s"
statsd_gauge_format="%s"
## time interval nsqd is configured to push to statsd (must match nsqd)
statsd_interval = "60s"
## HTTP endpoint (fully qualified) to which POST notifications of admin actions will be sent
notification_http_endpoint = ""
## nsqlookupd HTTP addresses
nsqlookupd_http_addresses = [
"http://nsqlookupd:4161"
]
## nsqd HTTP addresses (optional)
nsqd_http_addresses = [
"http://nsqd:4151"
]
log_dir = "/data/logs/nsqadmin"
#trace_query_url = ""
#trace_app_id = ""
#trace_app_name = ""
#trace_log_index_id = ""
#trace_log_index_name = ""
问题1: 在Nodes界面显示两个Node,端口分别4161和5161。在Lookup界面显示 127.0.0.1:4161 (configured) 和 127.0.0.1:4161 (Leader),没有127.0.0.1:5161信息。
问题2: 同时开启两个Node,端口分别4161和5161,创建Topic不能选择创建在哪个Node上,只能默认创建在4161上;关闭4161端口,再创建Topic,就在5161上创建了。
问题3: 在Nodes界面显示两个Node,端口分别4161和5161,这两个Node之间什么关系,会同步数据吗?
kafka在新增或移除节点, 或者新增移或移除consumer的时候会触发rebalance,导致consumer一段时间内无法消费消息,改造过的nsq有这个问题么?
想学习一下相关代码
准备全线使用有赞的nsq,请问有QQ/微信群,或者个人联系方式吗,担心遇到问题的时候不能快速解决
阅读了有赞技术博客redesigned nsq系列的四篇文章之后,发现很多对nsq功能的增强思路,都是借鉴了Kafka
比如采用磁盘存储来实现数据可靠性、可回溯,使用partition复制实现数据的备份,借助etcd来进行master选举(kafka使用的是zk)、通过客户端记录cursor来实现顺序消费等等
貌似大佬是把一个比较轻的消息中间件,改造成了一个不像kafka那么多功能,却又比原生nsq重一点的消息中间件了
所以想问一个问题,如果不考虑兼容性,那么我是直接选用kafka呢?还是使用有赞的nsq? 有没有性能测试数据?
Windows environment.
When I run command on cmd: nsqlookupd -config=C:\nsq\config\nsqlookupd.cfg, I got error message
"nsqlookupd.go:123] nsqlookupd.(*NSQLookupd).Main FATAL: start coordinator failed - client: etcd cluster is unavailable or misconfigured"
Do i need to install "github.com/coreos/etcd" and configure the environment to have ETCDCTL_API=3, so etcdctl uses the etcd API version 3 instead of defaulting to version 2?
I'm follw "有赞NSQ运维指南",Is there have a simple install guideline?
Many thanks!
求一份集群机器配置清单
I think it would be good thing to rename the project to get it more popular. Its hard to have the same name and used tcp ports while it is not compatible with the existing nsq infrastructure / clients apis.
since its like kafka you could call it ksq or something
currently its hard to have bitlys and youzan nsq on the same infrastructure.
nsqd support filtering out message with zan_test json ext header.
nsqd/delay_queue.go:31:45: constant 4294967296 overflows int
改成
largeDBSize int64 = 1024 * 1024 * 1024 * 4
编译通过
请提供下集群模式的部署文档吗(二进制文件部署方式)? 谢谢!
From @weisd on May 7, 2017 3:50
docker运行了 nsqd,nsqlookupd,nsqadmin
调用创建topic时返回500, INTERNAL_ERROR
http://localhost:4161/topic/create?topic=test&partition_num=1&replicator=1
看了下日志如果
I0506 13:30:47.069194 20696 router.go:273] httprouter.(*Router).recv ERROR: panic in HTTP handler - runtime error: invalid memory address or nil pointer dereference
是不是少了什么程序,还是bug? 其他ping接口正常
Copied from original issue: absolute8511/nsq#22
监控页面的Delete、Empty Queue的操作按钮建议不要放的这么明显,可以放到一个不明显的地方,防止手误点错
请问有中文文档吗,谢谢
最新 release 版 youzan/nsq server(v0.3.7-HA.1.9.4.1) + 最新版 youzan/go-nsq client (go-nsq v1.3.6-HA)
正常队列,客户端消费时误配置了EnableOrdered,但是客户端再将参数改回后,消息还是异常的。
package main
import (
"fmt"
"time"
"github.com/youzan/go-nsq"
)
func main() {
addr := "172.16.0.30:4161"
config := nsq.NewConfig()
config.MaxAttempts = 65535
mgr, err := nsq.NewTopicProducerMgr([]string{"delay_test"}, config)
if err != nil {
panic(err)
}
mgr.ConnectToNSQLookupd(addr)
err = mgr.Publish("delay_test", []byte("hello, "+time.Now().String()))
if err != nil {
panic(err)
}
// topic:delay_test 未开启order, 此处误操作,配置了 EnableOrdered 后续即便参数改回来,该 channel 的行为还是异常的
config.EnableOrdered = true
c, err := nsq.NewConsumer("delay_test", "default", config)
if err != nil {
panic(err)
}
c.AddConcurrentHandlers(nsq.HandlerFunc(func(msg *nsq.Message) error {
fmt.Println(string(msg.Body), msg.Attempts)
msg.RequeueWithoutBackoff(time.Second)
return nil
}), 1)
err = c.ConnectToNSQLookupd(addr)
if err != nil {
panic(err)
}
select {}
}
nsq现在支持etcd v3了吗?
grpc已经从google.golang.org/grpc迁移到github,源地址依赖下载不了,谢谢
http.go:1282] nsqadmin.(*httpServer).clusterStatsHandler WARNING: failed to find lookupd leader at this moment
我没有部署集群,就单台的启动了nsq,无法在页面添加topic,看log里面显示这个错误。有啥解决办法吗?没有部署etcd
I have read the docs and found that there is not any reference about auth mechanism of the topics in cluster. So, my question is that how we can do to allow multiple tenants to use a same cluster, and how a tenant can do to protect the topics from bad effects of any read/write operation from an another tenant.
比如可以订阅一个消息,或者在etcd里边能取到?获取不到的话那对于更新leader有什么建议的策略么
如题
github.com\youzan\nsq\nsqd\dqname_windows.go
github.com\youzan\nsq\nsqd\delay_queue.go
verssion: nsq-0.3.7-HA.1.6.4, delay_queue.go used getDelayQueueDBName method.
如何配置nsqadmin通过账号密码登入才能操作topic和channel?
Create Topic https://github.com/youzan/nsq/blob/7f4429f4d9df7cd33c4ec4d58e73343c2b975763/doc/resources/ordered_topic_create.png
按上述图片进行创建Topic不成功,没有错误提示,直接刷新了页面。
最后把最后一次操作的错误信息显示在界面上。
创建Topic除了启动nsqlookupd, nsqd, nsqadmin 外,还需要配置什么,比如:etcd要建什么目录,KV值之类的。
感谢~
我看go-nsq代码发现有 PublishOrdered方法, 但没找到例子。partitionKey 这个参数应该传什么值呢?
PublishOrdered(topic string, partitionKey []byte, body []byte)
使用go-nsq,producer_test.go中TestProducerConnection测试:
当Topic是有序时(order), Producer调用Publish异常:
producer_test.go:114: should lazily connect - E_BAD_PARTITION topic partition is not valid for multi partition: -1
如题,创建topic时, 创建在哪个nsqd node上,nsqlookup是怎么分配的?相关源码大概在哪个文件里?
是不是lookup 扔数据到 etcd里, nsqd 拉下来后 更具数据new 一个 topic?
extra option to allow nsq_tail&nsq_to_file to print message timestamp, for example: -timestamp
我阅读了你们的文档,目前每个topic在单个node只能有一个partition,这样partition就会受限于集群node的数量。个人感觉partition的扩展性比较差,这块有支持单个node多partition的计划么?应用场景是需要保证顺序的。
生成了docker,然后呢,直接敲nsqlookup启动?
也不知道怎么测试
nsqd 设置的retention_size_per_day指的是单个topic单个分区的日留存消息(已消费)大小吗?每个分区的数据分割文件是按max_bytes_per_file这个配置设定的么
现在能支持多机房&多网卡监听配置么,现在有个应用场景. 除了服务器内网需要进行消息消费之外,外网还需要做一个channel 消费端来消费同步数据. 如果这样需要怎么配置服务?
引用https://github.com/coreos/etcd 的环境,出现如下错误:
E1223 10:44:28.050456 98763 nsqd_node_etcd.go:258] consistence.(*NsqdEtcdMgr).WatchLookupdLeader watcher key[/NSQMetaData/message/NsqlookupdInfo/LookupdLeaderSession] error: client: response is invalid json. The endpoint is probably not valid etcd cluster endpoint.
E1223 10:44:33.053676 98763 nsqd_node_etcd.go:258] consistence.(*NsqdEtcdMgr).WatchLookupdLeader watcher key[/NSQMetaData/message/NsqlookupdInfo/LookupdLeaderSession] error: client: response is invalid json. The endpoint is probably not valid etcd cluster endpoint.
有关于ectd配置的文档吗?
或者,能否给提供一个思路?
From @rfyiamcool on August 24, 2017 5:14
see titile. thank U.
Copied from original issue: absolute8511/nsq#52
From @liushuchun on April 17, 2017 9:40
Copied from original issue: absolute8511/nsq#15
创建topic时Retention Days 设置为0,老的数据不会被删除,持续累积导致硬盘刷满。
配置文件里设置retention_size_per_day 不起作用吗?设置成1个G,然后还是会刷满硬盘
您好!请教个问题,nsq支持rabbitmq那样的ack确认机制吗?比如消息处理成功ack回应nsq后,nsq才把消息从队列中删除
如题,即使翻墙后依然有很多包无法下载,往往一个就卡住后续没法继续。比如:https://go.googlesource.com/exp, https://go.googlesource.com/text。是否可以像nsq官方一样提供一份windows环境下的release包。
From @absolute8511 on April 19, 2017 3:11
To make all the technical details clear for the other developers, we need add more documents. So we can have more contributors and it is also helpful to merge it back into mainline.
Copied from original issue: absolute8511/nsq#18
这是一个fork的项目的吧,至少写明是从哪fork的吧
请问Search/Trace如何开启,这块服务是内置的还是需要第三方提供的
A declarative, efficient, and flexible JavaScript library for building user interfaces.
🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
An Open Source Machine Learning Framework for Everyone
The Web framework for perfectionists with deadlines.
A PHP framework for web artisans
Bring data to life with SVG, Canvas and HTML. 📊📈🎉
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
Some thing interesting about web. New door for the world.
A server is a program made to process requests and deliver data to clients.
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
Some thing interesting about visualization, use data art
Some thing interesting about game, make everyone happy.
We are working to build community through open source technology. NB: members must have two-factor auth.
Open source projects and samples from Microsoft.
Google ❤️ Open Source for everyone.
Alibaba Open Source for everyone
Data-Driven Documents codes.
China tencent open source team.