Giter Club home page Giter Club logo

hangout's People

Contributors

bigbo avatar childe avatar cyberdak avatar dependabot[bot] avatar gmlewis avatar gnuhpc avatar hellococooo avatar jlleitschuh avatar linxuhong avatar rickyhuo avatar tianjinsong avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

hangout's Issues

hangout读取写入线程数

想问下hangout整个 filter 和 output 流程,是不是都在 Kafka 的 consumer 线程中完成?如果是的话是不是意味着hangout的读的线程数就决定output的线程数?
配置中
topic:
app: 2 决定几个线程获取该topic的数据
num.consumer.fetchers: "4" 这个配置有什么意义吗? 获取数据的线程数不是在那个topic已确定了?

谢谢回答

无法写入ES

按照样例配置

  • Elasticsearch:
    cluster: hangoutcluster
    hosts:
    - ip1
    - ip2
    index: 'hangout-%{+YYYY.MM.dd}'
    index_type: logs # default logs
    #document_id: ${id} # defautt null, generated by es
    bulk_actions: 20000 #default 20000
    bulk_size: 15 # default 15 MB
    flush_interval: 10 # default 10 seconds
    concurrent_requests: 0
    timezone: "Asia/Shanghai" # defaut UTC 时区. 只用于生成索引名字的字符串格式化
    sniff: false #default true
    无法写入es
    首先报ERROR StatusLogger No log4j2 configuration file found. Using default configuration: logging only errors to the console.
    其次看日志报错为:
    2017-01-06 15:12:23,075 ERROR com.ctrip.ops.sysdev.outputs.Elasticsearch$1 Thread-4 bulk got exception
    2017-01-06 15:12:23,076 ERROR com.ctrip.ops.sysdev.outputs.Elasticsearch$1 Thread-4 None of the configured nodes are available:
    请教如何解决?

hangout

发现zk重启之后,hangout就卡住不消费了,但是重启hangout后消费速度直接下降了很多,以下是日志,使用的版本的0.1.8.2
能帮忙看看吗
$1 pool-2-thread-1 bulk has failed item which NEED to retry 2017-03-20 10:54:10,623 ERROR com.ctrip.ops.sysdev.outputs.Elasticsearch$1 pool-2-thread-1 RemoteTransportException[[node_169][192.168.1.169:9300][indices:data/write/bulk[s]]]; nested: RemoteTransportException[[node_169][192.168.1.169:9300][indices:data/write/bulk[s][p]]]; nested: UnavailableShardsException[[test-2017.03.20][1] Not enough active copies to meet write consistency of [QUORUM] (have 1, needed 2). Timeout: [1m], request: [BulkShardRequest to [test-2017.03.20] containing [9755] requests]]; 2017-03-20 10:55:16,312 ERROR com.ctrip.ops.sysdev.outputs.Elasticsearch$1 elasticsearch[Louise Mason][bulk_processor][T#1] bulk has failed item which NEED to retry 2017-03-20 10:55:16,313 ERROR com.ctrip.ops.sysdev.outputs.Elasticsearch$1 elasticsearch[Louise Mason][bulk_processor][T#1] RemoteTransportException[[node_169][192.168.1.169:9300][indices:data/write/bulk[s]]]; nested: RemoteTransportException[[node_169][192.168.1.169:9300][indices:data/write/bulk[s][p]]]; nested: UnavailableShardsException[[test-2017.03.20][1] Not enough active copies to meet write consistency of [QUORUM] (have 1, needed 2). Timeout: [1m], request: [BulkShardRequest to [test-2017.03.20] containing [4946] requests]]; 2017-03-20 10:56:20,888 ERROR com.ctrip.ops.sysdev.outputs.Elasticsearch$1 pool-2-thread-1 bulk has failed item which NEED to retry 2017-03-20 10:56:20,890 ERROR com.ctrip.ops.sysdev.outputs.Elasticsearch$1 pool-2-thread-1 RemoteTransportException[[node_169][192.168.1.169:9300][indices:data/write/bulk[s][p]]]; nested: UnavailableShardsException[[test-2017.03.20][1] Not enough active copies to meet write consistency of [QUORUM] (have 1, needed 2). Timeout: [1m], request: [BulkShardRequest to [test-2017.03.20] containing [9771] requests]];'

filters的json解析对json字符串长度有限制吗

请问 filters 的 json 解析对 json 字符串长度有限制吗 ?
发现特别长的 message, 在 logstash 下可以正常写入到es, hangout 处理失败
log 信息:

2017-05-24 04:54:21,143 WARN com.ctrip.ops.sysdev.filters.Json pool-3-thread-7 failed to json parse field: message
2017-05-24 04:54:21,147 WARN com.ctrip.ops.sysdev.filters.Json pool-3-thread-2 failed to json parse field: message
2017-05-24 04:54:21,149 WARN com.ctrip.ops.sysdev.filters.Json pool-3-thread-2 failed to json parse field: message
2017-05-24 04:54:22,686 WARN com.ctrip.ops.sysdev.filters.Json pool-3-thread-2 failed to json parse field: message
2017-05-24 04:54:22,815 WARN com.ctrip.ops.sysdev.filters.Json pool-3-thread-5 failed to json parse field: message

hangout启动报错

hangout:hangout-0.2.1
kafka: 2.9.1-0.8.2.2
zookeeper:3.4.6
启动报错
2017-05-11 11:19:26,531 WARN kafka.utils.Logging$class ConsumerFetcherThread-hangout_node-1-1494472750245-ca30e025-0-0 [ConsumerFetcherThread-hangout_node-1-1494472750245-ca30e025-0-0], Error in fetch kafka.consumer.ConsumerFetcherThread$FetchRequest@7a608ee4. Possible cause: java.lang.IllegalArgumentException
2017-05-11 11:19:26,748 WARN kafka.utils.Logging$class ConsumerFetcherThread-hangout_node-1-1494472750245-ca30e025-0-0 [ConsumerFetcherThread-hangout_node-1-1494472750245-ca30e025-0-0], Error in fetch kafka.consumer.ConsumerFetcherThread$FetchRequest@2925b301. Possible cause: java.lang.IllegalArgumentException

该成自己的kafka
[root@elk bin]# ./hangout -f ../conf/kaf-es.yml
Hangout Version:0.2.1 Copyright @ctrip Author : childe@github, gnuhpc@github
Exception in thread "main" java.lang.NoClassDefFoundError: scala/ScalaObject
at java.lang.ClassLoader.defineClass1(Native Method)
at java.lang.ClassLoader.defineClass(ClassLoader.java:763)
at java.security.SecureClassLoader.defineClass(SecureClassLoader.java:142)
at java.net.URLClassLoader.defineClass(URLClassLoader.java:467)
at java.net.URLClassLoader.access$100(URLClassLoader.java:73)
at java.net.URLClassLoader$1.run(URLClassLoader.java:368)
at java.net.URLClassLoader$1.run(URLClassLoader.java:362)
at java.security.AccessController.doPrivileged(Native Method)
at java.net.URLClassLoader.findClass(URLClassLoader.java:361)
at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:331)
at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
at java.lang.ClassLoader.defineClass1(Native Method)
at java.lang.ClassLoader.defineClass(ClassLoader.java:763)
at java.security.SecureClassLoader.defineClass(SecureClassLoader.java:142)
at java.net.URLClassLoader.defineClass(URLClassLoader.java:467)
at java.net.URLClassLoader.access$100(URLClassLoader.java:73)
at java.net.URLClassLoader$1.run(URLClassLoader.java:368)
at java.net.URLClassLoader$1.run(URLClassLoader.java:362)
at java.security.AccessController.doPrivileged(Native Method)
at java.net.URLClassLoader.findClass(URLClassLoader.java:361)
at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:331)
at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
at java.lang.Class.forName0(Native Method)
at java.lang.Class.forName(Class.java:264)
at com.ctrip.ops.sysdev.core.Main.lambda$null$0(Main.java:67)
at java.util.LinkedHashMap.forEach(LinkedHashMap.java:684)
at com.ctrip.ops.sysdev.core.Main.lambda$main$1(Main.java:58)
at java.util.ArrayList.forEach(ArrayList.java:1249)
at com.ctrip.ops.sysdev.core.Main.main(Main.java:56)
Caused by: java.lang.ClassNotFoundException: scala.ScalaObject
at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:331)
at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
... 31 more

这种json数据该如何解析呢?jsonfail

源数据:

"@fields":{"cluster":"qy_api_v2_pool","body_bytes_sent":"8579","http_versioncode":"Android_32"}\n
"@fields":{"cluster":"qy_api_v2_pool","body_bytes_sent":"8579","http_versioncode":"Android_32"}\n
"@fields":{"cluster":"qy_api_v2_pool","body_bytes_sent":"8579","http_versioncode":"Android_32"}

想要的数据:
{"cluster":"qy_api_v2_pool","body_bytes_sent":"8579","http_versioncode":"Android_32"}
{"cluster":"qy_api_v2_pool","body_bytes_sent":"8579","http_versioncode":"Android_32"}
{"cluster":"qy_api_v2_pool","body_bytes_sent":"8579","http_versioncode":"Android_32"}

1、就只是要@fields里面的数据
2、并且分成拆分成3条。(kafka部门要求多条数据合并后传送,减少开销,实际将多条合并)

像这种需求该如何操作呢?求助

从kafka取消息,启动就报错

hangout是下载的release版本:hangout-0.1.8.2
kafka:0.8.2.2
hangout的配置文件:

inputs:
- Kafka:
codec: plain
topic:
loglib.app.json: 2
consumer_settings:
group.id: hangout
zookeeper.connect: 1.2.3.4/kafka/uat
auto.commit.interval.ms: "5000"
socket.receive.buffer.bytes: "10485760"
# fetch.message.max.bytes: "10485760"
# num.consumer.fetchers: "2"

outputs:
- Stdout: {}

启动命令:./bin/hangout -f config/test.yml -l log/test.log
启动后一直在报错:
2016-12-21 19:22:10,642 WARN kafka.utils.Logging$class ConsumerFetcherThread-hangout_AA-1482319320793-3d1d64a9-0-47 [ConsumerFetcherThread-hangout_AA-1482319320793-3d1d64a9-0-47], Error in fetch kafka.consumer.ConsumerFetcherThread$FetchRequest@23af0b3c. Possible cause: java.nio.BufferUnderflowException
2016-12-21 19:22:10,916 WARN kafka.utils.Logging$class ConsumerFetcherThread-hangout_AA-1482319320793-3d1d64a9-0-45 [ConsumerFetcherThread-hangout_AA-1482319320793-3d1d64a9-0-45], Error in fetch kafka.consumer.ConsumerFetcherThread$FetchRequest@2f9dc6f7. Possible cause: java.nio.BufferUnderflowException

hangout启动给的5G
HO_HEAP_SIZE="${HO_HEAP_SIZE:=5000m}"

帮忙看下哪里配置不对,多谢

es2.0使用kibana3遇到一个问题

elasticsearch 2.2.1
kibana 3 (使用你修改兼容 es2 的 k3)
image
k3 timepicker选择时间 ,如果es里没有对应时间的index ,es会报错IndexNotFoundException[no such index] ,k3的数据也不会refresh了(之前k3页面显示的是从开头到当前),请问有没有办法避免下
image
image
image

是否支持一个useragent的识别

你好:
针对应用日志比如 nginx/apache这样的日志,往往可能使用到useragent的识别。
例如:
ua Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/39.0.2171.95 Safari/537.36

处理后:
tua_category pc
tua_name Firefox
tua_os Windows 8.1
tua_vendor Mozilla
tua_version 41.0

类似于这样的,我刚开始使用hangout,感觉不错,正在做一些数据处理,目前我使用下面的项目来做UA识别,效果不错
https://github.com/bungoume/fluent-plugin-ua-parser

UA的数据库可使用的以下项目,更新较及时,也比较全面。
https://github.com/ua-parser/uap-core/blob/master/regexes.yaml

供参考!

数据无法写到elasticsearch

hosts配置的是127.0.0.1:9200
报错说节点不可用。试了很多端口和地址都不行。
我都是用的5.0版本的elasticsearch。克隆也是克隆的5.x分支。
是什么问题啊?

hangout写入时不参照template定义字段类型

我在template中定义了如下类型
"errno": {
"type": "long"
}
但是发现hangout写入后没参照template定义,在es中查看时是string类型
而且我只能在hangout的配置文件中注明

  • Convert:
    fields:
    errno:
    to: integer
    remove_if_fail: true
    才会生效。
    想请教一下是否hangout写入时不会参考template模板的定义而只参考hangout配置文件中的转换配置?

带端口配置

hosts:
- 10.8.73.4:9300
- 10.8.73.4:9310
- 10.8.73.4:9320

带端口配置可以吗

表示下感谢

hangout 在线上跑了一年多了,很稳定,感谢作者!

message Gsub不好使,这种数据该怎么用hangout处理呢?

源数据

{
"@timestamp":"2017-08-03T10:14:22+08:00",
"@fields":{"cluster":"detail_pool","request":"GET","body_bytes_sent":"9273","http_uid":"26","upstream_status":"200","protocol":"http","hostname":"localhost"}
}

最终只想要

{
"@timestamp":"2017-08-03T10:14:22+08:00",
"cluster":"detail_pool",
"upstream_status":"200"
}

logstash可以处理,但是hangout不知道用什么语法,我想用先将@fields替换字典,然后在grok,发现替换失败

又麻烦了哈~

多个单线程input只执行第一个

kafka input会在input线程中再额外新开一个或多个消费的线程去处理. 但stdin这种单线程的会有emit方法里面死循环处理, 导致不能执行后续的input

output模块kafka配置

Hangout Version:0.2.1
能否提供一份新版output kafka模块配置模板,README里面的配置模板不可用。感谢

output到ES时host列表

请问output到ES时,host列表要将所有的data node节点都写上吗?是否可以写client node?
如果将所有的data node都写上;那hangout的负载策略是啥?
我用的es 2.2.1版本,hangout 0.1.8.2版本

es宕机重启之后,hangout不再向es写入数据

hi,我这边遇见一个问题,当es master宕机重启之后,发现hangout不再向es中写入数据,不做任何操作了。重启hangout之后,恢复正常。

请问,这个有什么解决办法吗?

请问json解析失败的数据可以通过stdout打印出来吗

数据量略大时候, 看到日志有大量的解析失败报错, 不确定具体失败的数据长啥样
请问这个可以打印出来定位吗
2017-05-03 03:16:53,092 WARN com.ctrip.ops.sysdev.filters.Json pool-3-thread-1 failed to json parse field: message

es version

您好:
请问es的版本必须是5.0以上才能正常写入吗?

我这边2.4的版本报错如下:
2017-01-11 11:40:17,239 ERROR com.ctrip.ops.sysdev.outputs.Elasticsearch$1 elasticsearch[client][bulk_processor][T#1] bulk got exception
2017-01-11 11:40:17,239 ERROR com.ctrip.ops.sysdev.outputs.Elasticsearch$1 elasticsearch[client][bulk_processor][T#1] None of the configured nodes are available: [{#transport#-1}{tQ0EE85_TdeBvfH1cC2bbQ}{x}{x}]

output to Elasticsearch show me error!

Error message as follows:
ERROR com.ctrip.ops.sysdev.outputs.Elasticsearch$1 elasticsearch[Ransak the Reject][bulk_processor][T#1] bulk got exception
ERROR com.ctrip.ops.sysdev.outputs.Elasticsearch$1 elasticsearch[Ransak the Reject][bulk_processor][T#1] None of the configured nodes are available: [{#transport#-1}{192.168.10.2}{192.168.10.2:8888}]

what's the error mean ?

Here are my output config:
outputs:
- Elasticsearch:
cluster: log
hosts:
- 192.168.10.2:8888
index: 'test-%{+YYYY.MM.dd}'
sniff: false

About kafka errors in hangout log file.

Hangout log file sometimes show me the follows errors:
org.apache.kafka.clients.consumer.internals.ConsumerCoordinator$OffsetCommitResponseHandler pool-1-thread-1 Error UNKNOWN_MEMBER_ID occurred while committing offsets for group xxx
org.apache.kafka.clients.consumer.internals.ConsumerCoordinator pool-1-thread-1 Auto offset commit failed

Why this occur?

保存到文件如何切分日志

如果在写入ES的同时希望落地到文件,同时需要切分文件的话,是否是输出到stdout并通过cronolog等方式切分文件?

输出插件并没有看到file类型的插件。

感谢!

download,can not run on my system

I clone it from github, and run it , I got a error:
bin/hangout -f example.yml
错误: 找不到或无法加载主类 com.ctrip.ops.sysdev.Main

设置index_type取字段值时,如果字段不存在,则出错

设置index_type取字段值时,如果消息中字段不存在,则出错, 能否兼容处理下, 字段不存在时取默认值, 或者取第二优先级的字段

配置如下:

outputs:
    - Elasticsearch:
        cluster: elasticsearch
        hosts:
          - 192.168.12.29
        index: '%{@index}'
        index_type: ${@type} # default logs
        #document_id: ${id} # defautt null, generated by es
        bulk_actions: 20000 #default 20000
        bulk_size: 15 # default 15 MB
        flush_interval: 10 # default 10 seconds
        concurrent_requests: 0 # default 0, concurrent_requests设置成大于0的数, 意思着多线程处理, 以我应用的经验,还有是一定OOM风险的,强烈建议设置为0
        #timezone: "Asia/Shanghai" # defaut UTC 时区. 只用于生成索引名字的字符串格式化
#        sniff: false #default true

数据示例:

[
            '@index' => 'jw',
           //  '@type' => 'jw_type4',  // 没有该字段时则出错
            'lion_k' => date('Y-m-d H:i:s'),
            'test_array' => [
                'k1' => 1,
                'k2' => 'aaa'
            ]
]

当没有 @type 字段时则报错

报错如下:

2017-05-03 10:53:11,337 ERROR freemarker.log._Log4jLoggerFactory$Log4jLogger pool-2-thread-7 Error executing FreeMarker template
FreeMarker template error:
The following has evaluated to null or missing:
==> @type  [in template "${@type}" at line 1, column 3]

----
Tip: If the failing expression is known to be legally refer to something that's sometimes null or missing, either specify a default value like myOptionalVar!myDefault, or use <#if myOptionalVar??>when-present<#else>when-missing</#if>. (These only cover the last step of the expression; to cover the whole expression, use parenthesis: (myOptionalVar.foo)!myDefault, (myOptionalVar.foo)??
----

----
FTL stack trace ("~" means nesting-related):
        - Failed at: ${@type}  [in template "${@type}" at line 1, column 1]
----

Java stack trace (for programmers):
----
freemarker.core.InvalidReferenceException: [... Exception message was already printed; see it above ...]
        at freemarker.core.InvalidReferenceException.getInstance(InvalidReferenceException.java:131)
        at freemarker.core.EvalUtil.coerceModelToString(EvalUtil.java:355)
        at freemarker.core.Expression.evalAndCoerceToString(Expression.java:82)
        at freemarker.core.DollarVariable.accept(DollarVariable.java:41)
        at freemarker.core.Environment.visit(Environment.java:324)
        at freemarker.core.Environment.process(Environment.java:302)
        at freemarker.template.Template.process(Template.java:323)
        at com.ctrip.ops.sysdev.render.FreeMarkerRender.render(FreeMarkerRender.java:34)
        at com.ctrip.ops.sysdev.outputs.Elasticsearch.emit(Elasticsearch.java:215)
        at com.ctrip.ops.sysdev.outputs.BaseOutput.process(BaseOutput.java:60)
        at com.ctrip.ops.sysdev.inputs.Kafka$Consumer.run(Kafka.java:77)
        at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
        at java.util.concurrent.FutureTask.run(FutureTask.java:266)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
        at java.lang.Thread.run(Thread.java:745)
2017-05-03 10:53:11,694 ERROR com.ctrip.ops.sysdev.render.FreeMarkerRender pool-2-thread-7 The following has evaluated to null or missing:
==> @type  [in template "${@type}" at line 1, column 3]

----
Tip: If the failing expression is known to be legally refer to something that's sometimes null or missing, either specify a default value like myOptionalVar!myDefault, or use <#if myOptionalVar??>when-present<#else>when-missing</#if>. (These only cover the last step of the expression; to cover the whole expression, use parenthesis: (myOptionalVar.foo)!myDefault, (myOptionalVar.foo)??
----

----
FTL stack trace ("~" means nesting-related):
        - Failed at: ${@type}  [in template "${@type}" at line 1, column 1]
----
2017-05-03 10:53:18,253 ERROR com.ctrip.ops.sysdev.outputs.Elasticsearch$1 elasticsearch[Rick Jones][bulk_processor][T#1] bulk has failed item which do NOT need to retry
2017-05-03 10:53:18,255 ERROR com.ctrip.ops.sysdev.outputs.Elasticsearch$1 elasticsearch[Rick Jones][bulk_processor][T#1] java.lang.IllegalArgumentException: Validation Failed: 1: mapping type is empty;

hangout-0.1.8.2-ES2.3.5读取kafka消息报错

hangout:hangout-0.1.8.2-ES2.3.5
kafka: 0.8.2.0
zookeeper:3.3.5

配置文件:
inputs:
- Kafka:
codec: json
encoding: UTF8 # defaut UTF8
topic:
log_billing: 2
consumer_settings:
group.id: hangout
zookeeper.connect: xx.xx.xx.xx:21810
auto.commit.interval.ms: "1000"
outputs:
- Stdout: {}

读取不到kafka中的消息,在日志中报如下错误
2017-01-20 11:26:39,151 WARN kafka.utils.Logging$class ConsumerFetcherThread-hangout_resourcepool-0231-jkdd01-1484882793828-16b4fdc5-0-2 [ConsumerFetcherThread-hangout_resourcepool-0231-jkdd01-1484882793828-16b4fdc5-0-2], Error in fetch kafka.consumer.ConsumerFetcherThread$FetchRequest@1965eff6. Possible cause: java.lang.IllegalArgumentException

将vender下kafka相关的jar包换成我服务器上kafka目录下的jar包后,启动时报这样的错误:
Exception in thread "main" java.lang.reflect.InvocationTargetException
at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:57)
at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
at java.lang.reflect.Constructor.newInstance(Constructor.java:526)
at com.ctrip.ops.sysdev.Main.main(Main.java:107)
Caused by: java.lang.NoClassDefFoundError: scala/reflect/ClassManifest
at kafka.utils.Log4jController$.(Log4jController.scala:29)
at kafka.utils.Log4jController$.(Log4jController.scala)
at kafka.utils.Logging$class.$init$(Logging.scala:29)
at kafka.utils.VerifiableProperties.(VerifiableProperties.scala:26)
at kafka.consumer.ConsumerConfig.(ConsumerConfig.scala:94)
at com.ctrip.ops.sysdev.inputs.Kafka.prepare(Kafka.java:114)
at com.ctrip.ops.sysdev.inputs.BaseInput.(BaseInput.java:105)
at com.ctrip.ops.sysdev.inputs.Kafka.(Kafka.java:96)
... 5 more
Caused by: java.lang.ClassNotFoundException: scala.reflect.ClassManifest
at java.net.URLClassLoader$1.run(URLClassLoader.java:366)
at java.net.URLClassLoader$1.run(URLClassLoader.java:355)
at java.security.AccessController.doPrivileged(Native Method)
at java.net.URLClassLoader.findClass(URLClassLoader.java:354)
at java.lang.ClassLoader.loadClass(ClassLoader.java:425)
at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:308)
at java.lang.ClassLoader.loadClass(ClassLoader.java:358)

Elasticsearch output 如何获取日志中的时间

Hangout Version:0.2.1

我希望根据日志里的时间设定index。但是我根据首页README.md配置出现报错

- Elasticsearch
    cluster: es
    host:
        - 127.0.0.1
    index: 'hangout-${@timestamp.toString("YYYY.MM.dd")}'
    index_type: logs
    bulk_actions: 20000
    bulk_size: 15
    flush_interval: 10
    concurrent_requests: 0

报错信息如下:

ERROR com.ctrip.ops.sysdev.outputs.Elasticsearch$1 elasticsearch[client][bulk_processor][T#1] RemoteTransportException[[127.0.0.1][127.0.0.1:9300][indices:admin/create]]; nested: InvalidIndexNameException[Invalid index name [hangout-${@timestamp.toString("YYYY.MM.dd")}], must not contain the following characters [ , ", *, , <, |, ,, >, /, ?]];

希望hangout能够有类似logstash的urldecode功能和KV插件中的include_keys功能

hello,childe,公司线上部分日志比较乱,实际使用过程中,发现还需要类似logstash 的urldecode插件和kv插件中的include_keys功能。如果方便的话,希望能够有这个新功能,谢谢你。
logstash:
urldecode {
#all_fields => true
field => ["url_args"]
}

KV:
kv {
prefix => "url_"
source => "url_args"
field_split => "&"
include_keys => [ 'appv','net','osn','osv' ]
}

es5.0写入异常

从kafka消费写到es的时候报错
日志显示:2017-06-28 18:20:06,740 ERROR com.ctrip.ops.sysdev.outputs.Elasticsearch$1 pool-2-thread-3 bulk got exception: None of the configured nodes are available: [{#transport#-1}{3g60MJJNTMq4LiK82taR5w}{10.10.1.14}{10.10.1.14:9291}]
这个节点是数据节点,我的output es 配置
outputs:
- Elasticsearch:
cluster: clustername (这里是正确的集群名)
hosts:
- 10.10.1.14:9291
index: 'java-test-%{+YYYY.MM.dd.HH}'
index_type: logs # default logs
#document_id: ${id} # defautt null, generated by es
bulk_actions: 20000 #default 20000
bulk_size: 15 # default 15 MB
flush_interval: 1 # default 10 seconds
concurrent_requests: 0
timezone: "Asia/Shanghai" # defaut UTC 时区. 只用于生成索引名字的字符串格式化
sniff: false #default true
我的input kafka配置
inputs:
- Kafka:
codec: json
encoding: UTF8 # defaut UTF8
topic:
java_test_topic: 4
#topic_pattern: #pattern has high priority,if specified, topic will be ignored
# test.*: 3
consumer_settings:
group.id: hangout-test
zookeeper.connect: 10.10.1.18:2181,10.10.1.19:2181,10.10.1.20:2181
fetch.message.max.bytes: "3145728"
socket.receive.buffer.bytes: "1048576"
auto.commit.interval.ms: "1000"

0.18 版本启动报错,老版本没有问题

老版本我也不记得是什么版本了,实测是可以的,应该是还没有支持2.x 的版本。
kafka 0.8.3
zk 3.4
sun jdk 8

inputs:
    - Kafka:
        topic:
          voice_log: 2
        consumer_settings:
          group.id: logstash_lbc_haproxxxxx
          auto.commit.interval.ms: "1000"
          zookeeper.connect: 10.120.169.149:2181
          num.consumer.fetchers: "2"
        codec: plain
outputs:
    - Stdout: {}

使用老版本启动正常并能获取到数据,新版本报错

2016-09-23 10:55:30,953 DEBUG org.apache.zookeeper.ClientCnxn$SendThread main-SendThread(10.120.169.149:2181) Reading reply sessionid:0x1573a3557f2397f, packet:: clientPath:null serverPath:null finished:false header:: 59,4  replyHeader:: 59,700877946,0  request:: '/brokers/ids/1,F  response:: #7b226a6d785f706f7274223a393939392c2274696d657374616d70223a2231343734353132383437383334222c22686f7374223a2231302e3132302e3136392e313439222c2276657273696f6e223a312c22706f7274223a393039327d,s{695560785,695560785,1474512847839,1474512847839,0,0,0,96609917794922377,93,0,695560785}
2016-09-23 10:55:30,958 DEBUG org.apache.zookeeper.ClientCnxn$SendThread main-SendThread(10.120.169.149:2181) Reading reply sessionid:0x1573a3557f2397f, packet:: clientPath:null serverPath:null finished:false header:: 60,4  replyHeader:: 60,700877946,0  request:: '/brokers/ids/2,F  response:: #7b226a6d785f706f7274223a393939392c2274696d657374616d70223a2231343734353132383530343338222c22686f7374223a2231302e3132302e3136392e313530222c2276657273696f6e223a312c22706f7274223a393039327d,s{695561281,695561281,1474512850439,1474512850439,0,0,0,96609917794922297,93,0,695561281}
2016-09-23 10:55:30,962 INFO kafka.utils.Logging$class logstash_lbc_haproxxxxx_elk-logstash01-103.xx.com-1474599327899-d00dc0c0-leader-finder-thread Verifying properties
2016-09-23 10:55:30,962 INFO kafka.utils.Logging$class logstash_lbc_haproxxxxx_elk-logstash01-103.xx.com-1474599327899-d00dc0c0-leader-finder-thread Property client.id is overridden to logstash_lbc_haproxxxxx
2016-09-23 10:55:30,963 INFO kafka.utils.Logging$class logstash_lbc_haproxxxxx_elk-logstash01-103.xx.com-1474599327899-d00dc0c0-leader-finder-thread Property metadata.broker.list is overridden to 10.120.169.149:9092,10.120.169.150:90922016-09-23 10:55:30,963 INFO kafka.utils.Logging$class logstash_lbc_haproxxxxx_elk-logstash01-103.xx.com-1474599327899-d00dc0c0-leader-finder-thread Property request.timeout.ms is overridden to 30000
2016-09-23 10:55:30,963 INFO kafka.utils.Logging$class logstash_lbc_haproxxxxx_elk-logstash01-103.xx.com-1474599327899-d00dc0c0-leader-finder-thread Fetching metadata from broker BrokerEndPoint(2,10.120.169.150,9092) with correlation id 11 for 1 topic(s) Set(voice_log)
2016-09-23 10:55:30,964 DEBUG kafka.utils.Logging$class logstash_lbc_haproxxxxx_elk-logstash01-103.xx.com-1474599327899-d00dc0c0-leader-finder-thread Created socket with SO_TIMEOUT = 30000 (requested 30000), SO_RCVBUF = 186240 (requested -1), SO_SNDBUF = 102400 (requested 102400), connectTimeoutMs = 30000.
2016-09-23 10:55:30,964 INFO kafka.utils.Logging$class logstash_lbc_haproxxxxx_elk-logstash01-103.xx.com-1474599327899-d00dc0c0-leader-finder-thread Connected to 10.120.169.150:9092 for producing
2016-09-23 10:55:30,965 DEBUG kafka.utils.Logging$class ConsumerFetcherThread-logstash_lbc_haproxxxxx_elk-logstash01-103.xx.com-1474599327899-d00dc0c0-1-2 [ConsumerFetcherThread-logstash_lbc_haproxxxxx_elk-logstash01-103.xx.com-1474599327899-d00dc0c0-1-2], handling partitions with error for Set([voice_log,0])
2016-09-23 10:55:30,966 INFO kafka.utils.Logging$class logstash_lbc_haproxxxxx_elk-logstash01-103.xx.com-1474599327899-d00dc0c0-leader-finder-thread Disconnecting from 10.120.169.150:9092
2016-09-23 10:55:30,966 DEBUG kafka.utils.Logging$class logstash_lbc_haproxxxxx_elk-logstash01-103.xx.com-1474599327899-d00dc0c0-leader-finder-thread Successfully fetched metadata for 1 topic(s) Set(voice_log)
2016-09-23 10:55:30,967 DEBUG kafka.utils.Logging$class logstash_lbc_haproxxxxx_elk-logstash01-103.xx.com-1474599327899-d00dc0c0-leader-finder-thread [logstash_lbc_haproxxxxx_elk-logstash01-103.xx.com-1474599327899-d00dc0c0-leader-finder-thread], {TopicMetadata for topic voice_log ->
Metadata for partition [voice_log,1] is         partition 1     leader: BrokerEndPoint(1,10.120.169.149,9092)   replicas: BrokerEndPoint(1,10.120.169.149,9092) isr: BrokerEndPoint(1,10.120.169.149,9092)      isUnderReplicated: false
Metadata for partition [voice_log,0] is         partition 0     leader: BrokerEndPoint(2,10.120.169.150,9092)   replicas: BrokerEndPoint(2,10.120.169.150,9092) isr: BrokerEndPoint(2,10.120.169.150,9092)      isUnderReplicated: false}
2016-09-23 10:55:30,967 DEBUG kafka.utils.Logging$class ConsumerFetcherThread-logstash_lbc_haproxxxxx_elk-logstash01-103.xx.com-1474599327899-d00dc0c0-1-2 [ConsumerFetcherManager-1474599328022] adding partitions with error Set([voice_log,0])
2016-09-23 10:55:30,969 INFO kafka.utils.Logging$class ConsumerFetcherThread-logstash_lbc_haproxxxxx_elk-logstash01-103.xx.com-1474599327899-d00dc0c0-0-1 [ConsumerFetcherThread-logstash_lbc_haproxxxxx_elk-logstash01-103.xx.com-1474599327899-d00dc0c0-0-1], Starting
2016-09-23 10:55:30,969 INFO kafka.utils.Logging$class logstash_lbc_haproxxxxx_elk-logstash01-103.xx.com-1474599327899-d00dc0c0-leader-finder-thread [ConsumerFetcherManager-1474599328022] Added fetcher for partitions ArrayBuffer([[voice_log,1], initOffset 12394251065 to broker BrokerEndPoint(1,10.120.169.149,9092)] )
2016-09-23 10:55:30,969 DEBUG kafka.utils.Logging$class ConsumerFetcherThread-logstash_lbc_haproxxxxx_elk-logstash01-103.xx.com-1474599327899-d00dc0c0-0-1 Disconnecting from 10.120.169.149:9092

消费Kafka线程数问题

你好 我用一个hangout来消费kafka中多个topic的数量
每个topic的partition都是十
大约消费10个topic的数据吧,然后写入es,我的配置中有两个是设置线程数的
topic:
app_fix: 2
consumer_settings:
num.consumer.fetchers: "3"
第一个设置:开2个线程取app这个topic的数据
第二个设置:启动Consumer的个数,适当增加可以提高并发度
但是报错
java.lang.IllegalStateException: bulk process already closed
at org.elasticsearch.action.bulk.BulkProcessor.ensureOpen(BulkProcessor.java:281)
at org.elasticsearch.action.bulk.BulkProcessor.internalAdd(BulkProcessor.java:286)
at org.elasticsearch.action.bulk.BulkProcessor.add(BulkProcessor.java:271)
at org.elasticsearch.action.bulk.BulkProcessor.add(BulkProcessor.java:267)
at org.elasticsearch.action.bulk.BulkProcessor.add(BulkProcessor.java:253)
at com.ctrip.ops.sysdev.outputs.Elasticsearch.emit(Elasticsearch.java:226)
at com.ctrip.ops.sysdev.outputs.BaseOutput.process(BaseOutput.java:60)
at com.ctrip.ops.sysdev.inputs.Kafka$Consumer.run(Kafka.java:77)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
2017-02-17 18:43:24,758 ERROR com.ctrip.ops.sysdev.inputs.Kafka$Consumer pool-31-thread-1 java.lang.IllegalStateException: bulk process already closed
想请问一下这个跟我设置的线程数的问题有关?还是es问题?还有那两个参数具体该怎么理解?
谢谢!

从kafka拉数据到es,启动的时候报错老是: bin/hangout -f test.yml

Exception in thread "main" java.lang.NullPointerException
at scala.collection.convert.Wrappers$JMapWrapperLike$$anon$2.(Wrappers.scala:265)
at scala.collection.convert.Wrappers$JMapWrapperLike$class.iterator(Wrappers.scala:264)
at scala.collection.convert.Wrappers$JMapWrapper.iterator(Wrappers.scala:275)
at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
at scala.collection.TraversableOnce$class.foldLeft(TraversableOnce.scala:144)
at scala.collection.AbstractTraversable.foldLeft(Traversable.scala:105)
at scala.collection.TraversableOnce$class.$div$colon(TraversableOnce.scala:138)
at scala.collection.AbstractTraversable.$div$colon(Traversable.scala:105)
at scala.collection.immutable.MapLike$class.$plus$plus(MapLike.scala:87)
at scala.collection.immutable.AbstractMap.$plus$plus(Map.scala:187)
at kafka.javaapi.consumer.ZookeeperConsumerConnector.createMessageStreams(ZookeeperConsumerConnector.scala:83)
at kafka.javaapi.consumer.ZookeeperConsumerConnector.createMessageStreams(ZookeeperConsumerConnector.scala:97)
at com.ctrip.ops.sysdev.inputs.Kafka.emit(Kafka.java:128)
at com.ctrip.ops.sysdev.Main.main(Main.java:109)

我的配置
inputs:
- Kafka:
codec: json
topic:
wiseweb_crawler_webpage: 4
consumer_settings:
group.id: hangouts
# socket.receive.buffer.bytes:"1048576"
# fetch.message.max.bytes:"1048576"
zookeeper.connect: 10.165.65.14:2181,10.171.100.247:2181,10.251.5.35:2181
auto.commit.interval.ms: "5000"
这是输入

感谢

安装

请问一下怎么安装?

geoip 解析失败

  • GeoIP2:
    source: client_ip
    target: geoip
    city_name: false
    database: '/home/zcola/GeoLite2-Country.mmdb'
    #database: '/home/elk/running/logstash/vendor/nxxxe.mmdb'
    DEBUG com.ctrip.ops.sysdev.filters.GeoIP2 pool-2-thread-2 java.lang.UnsupportedOperationException: Invalid attempt to open a GeoLite2-Country database using the city method

使用自编的 mmdb 和免费的GeoLite2-Country.mmdb 都不行,logstash 自己写geoip2的插件可以

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.