Giter Club home page Giter Club logo

fluent-plugin-concat's Introduction

fluent-plugin-concat

Build Status

Fluentd Filter plugin to concatenate multiline log separated in multiple events.

Requirements

fluent-plugin-concat fluentd ruby
>= 2.0.0 >= v0.14.0 >= 2.1
< 2.0.0 >= v0.12.0 >= 1.9

Installation

Add this line to your application's Gemfile:

gem 'fluent-plugin-concat'

And then execute:

$ bundle

Or install it yourself as:

$ gem install fluent-plugin-concat

Configuration

Example

<filter docker.log>
  @type concat
  key loga
  #separator "\n"
  n_lines 10
  #multiline_start_regexp /^Start/
  #multiline_end_regexp /^End/
  #continuous_line_regexp nil
  #stream_identity_key nil
  #flush_interval 60
  #timeout_label nil
  #use_first_timestamp false
  #partial_key nil
  #partial_value nil
  #keep_partial_key false
  #use_partial_metadata false
  #keep_partial_metadata false
  #partial\_metadata\_format docker-fluentd
  #use\_partial\_cri\_logtag false
  #partial\_cri\_logtag\_key nil
  #partial\_cri\_stream\_key stream
</filter>

Parameter

parameter description default
key The key for part of multiline log
separator The separator of lines "\n"
n_lines The number of lines. This is exclusive with multiline_start_regex nil
multiline_start_regexp The regexp to match beginning of multiline. This is exclusive with n_lines nil
multiline_end_regexp The regexp to match ending of multiline.This is exclusive with n_lines nil
continuous_line_regexp The regexp to match continuous lines.This is exclusive with n_lines nil
stream_identity_key The key to determine which stream an event belongs to nil
flush_interval The number of seconds after which the last received event log will be flushed.If specified 0, wait for next line forever 60
timeout_label The label name to handle events caused by timeout nil
use_first_timestamp Use timestamp of first record when buffer is flushed false
partial_key The field name that is the reference to concatenate records nil
partial_value The value stored in the field specified by partial_key that represent partial log nil
keep_partial_key If true, keep partial_key in concatenated records false
use_partial_metadata Use partial metadata to concatenate multiple records false
keep_partial_metadata If true, keep partial metadata false
partial_metadata_format Input format of the partial metadata (fluentd or journald docker log driver) ( docker-fluentd, docker-journald, docker-journald-lowercase)
Configure based on the input plugin, that is used.
The docker fluentd and journald log drivers are behaving differently, so the plugin needs to know, what to look for.
Use docker-journald-lowercase, if you have fields_lowercase true in the journald source config
docker-fluentd
use_partial_cri_logtag bool (optional) Use cri log tag to concatenate multiple records
partial_cri_logtag_key string (optional) The key name that is referred to concatenate records on cri log
partial_cri_stream_key string (optional) The key name that is referred to detect stream name on cri log

Usage

Every 10 events will be concatenated into one event.

<filter docker.log>
  @type concat
  key message
  n_lines 10
</filter>

Specify first line of multiline by regular expression.

<filter docker.log>
  @type concat
  key message
  multiline_start_regexp /^Start/
</filter>

You can handle timeout events and remaining buffers on shutdown this plugin.

<label @ERROR>
  <match docker.log>
    @type file
    path /path/to/error.log
  </match>
</label>

Handle timeout log lines the same as normal logs.

<filter **>
  @type concat
  key message
  multiline_start_regexp /^Start/
  flush_interval 5
  timeout_label @NORMAL
</filter>

<match **>
  @type relabel
  @label @NORMAL
</match>

<label @NORMAL>
  <match **>
    @type stdout
  </match>
</label>

Handle single line JSON from Docker containers.

<filter **>
  @type concat
  key message
  multiline_end_regexp /\n$/
</filter>

Handle Docker logs splitted in several parts (using partial_message), and do not add new line between parts.

<filter>
  @type concat
  key log
  partial_key partial_message
  partial_value true
  separator ""
</filter>

(Docker v19.03+) Handle Docker logs splitted in several parts (using use_partial_metadata), and do not add new line between parts.

<filter>
  @type concat
  key log
  use_partial_metadata true
  separator ""
</filter>

(Docker v20.10+) Handle Docker logs splitted in several parts (using use_partial_metadata), and do not add new line between parts.

Docker v20.10 improved partial message handling by adding better metadata in the journald log driver, this works now similarily to the fluentd log driver, but requires one additional setting

<filter>
  @type concat
  key log
  use_partial_metadata true
  partial_metadata_format docker-journald
  separator ""
</filter>

Handle Docker logs splitted in several parts (using newline detection), and do not add new line between parts (prior to Docker 18.06).

<filter **>
  @type concat
  key log
  multiline_end_regexp /\\n$/
  separator ""
</filter>

Handle containerd/cri in Kubernetes.

<source>
  @type tail
  path /var/log/containers/*.log
  <parse>
    @type regexp
    expression /^(?<time>[^ ]+) (?<stream>stdout|stderr) (?<logtag>[^ ]*) (?<message>.*)$/
    time_format %Y-%m-%dT%H:%M:%S.%L%z
  </parse>
  tag k8s
  @label @CONCAT
</source>

<label @CONCAT>
  <filter k8s>
    @type concat
    key message
    use_partial_cri_logtag true
    partial_cri_logtag_key logtag
    partial_cri_stream_key stream
  </filter>
  <match k8s>
    @type relabel
    @label @OUTPUT
  </match>
</label>

<label @OUTPUT>
  <match>
    @type stdout
  </match>
</label>

Contributing

  1. Fork it
  2. Create your feature branch (git checkout -b my-new-feature)
  3. Commit your changes (git commit -am 'Add some feature')
  4. Push to the branch (git push origin my-new-feature)
  5. Create new Pull Request

License

The gem is available as open source under the terms of the MIT License.

fluent-plugin-concat's People

Contributors

bai avatar cosmo0920 avatar desaintmartin avatar jaygorrell avatar jpdenford avatar kenhys avatar lobeck avatar okkez avatar raytung avatar repeatedly avatar sfrostick avatar takotakot avatar tobilarscheid avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

fluent-plugin-concat's Issues

Plugin does not support @label

The fluentd out_relabel documentation states:

FYI: All of input and output plugins also have @Label parameter provided by Fluentd core. The relabel plugin is a plugin which actually does nothing, but supports only @Label parameter.

While this is not a core plugin, nor is it an input or output plugin, it does provide timeout_label functionality.

As described in this issue thread a method is described to grab timeout_label captured messages in a similar fashion to non-labelled or otherwise labelled events.

It involves the use of the relabel plugin to capture non timeout_label output and label it the same, like so:

<filter mything>
    @type concat
    .. snip ..
    timeout_label @NORMAL
</filter>

<match **>
    @type relabel
    @label @NORMAL
</match>

<label @NORMAL>
    ... snip ...
</label>

I think that this step could be alleviated by enabling this plugin to also handle the @label attribute.

This would reduce the above to:

<filter mything>
    @type concat
    .. snip ..
    @label @NORMAL
    timeout_label @NORMAL
</filter>

<label @NORMAL>
    ... snip ...
</label>

But it appears that this plugin ignores @Label, although no warnings seem to appear either, the label does not take effect, requiring the use of the relabel plugin for this behavior.

Is this an out of bounds request since this is a filter plugin, or does this seem like it would be a useful addition?

Concatinate split log line

This issue is reserved as a future work.

Problem

Docker split huge log line into several logs.

Steps to replicate

$ docker run --log-driver=fluentd -it perl perl -e 'print "START" ;print "X" x 24000; print "END\n"'

Future Expected Behavior

Incoming the first split message contains "partial_message":"true" and the second message does not contain it.
And then, they will be concatenated into one line.

Currently, this specification is work in progress.

Your environment

  • OS version
    Debian GNU/Linux Stretch
  • paste result of fluentd --version or td-agent --version
    % bundle exec fluentd --version
    fluentd 1.1.0
  • plugin version
    • HEAD of master (future work)

Fluent::Plugin::ConcatFilter::TimeoutError error="Timeout flush

Problem

I'm using kubernetes addon EFK, but timeout flush, data loss. when I remove concat plugin from config , there is no error in fluentd log. Is the config not correct? Thank you for you help.

Steps to replicate

kind: ConfigMap
apiVersion: v1
metadata:
  name: fluentd-es-config-v0.1.6
  namespace: kube-system
  labels:
    addonmanager.kubernetes.io/mode: Reconcile
data:
  system.conf: |-
    <system>
      root_dir /tmp/fluentd-buffers/
    </system>

  containers.input.conf: |-
    <source>
      @id fluentd-containers.log
      @type tail
      path /var/log/containers/*.log
      pos_file /var/log/es-containers.log.pos
      exclude_path ["/var/log/containers/pvc*.log", "/var/log/containers/etcd*.log", "/var/log/containers/kibana*.log", "/var/log/containers/fluentd*.log"]
      tag raw.kubernetes.*
      <parse>
        @type multi_format
        <pattern>
          format json
          time_key time
          time_format %Y-%m-%dT%H:%M:%S.%NZ
        </pattern>
        <pattern>
          format /^(?<time>.+) (?<stream>stdout|stderr) [^ ]* (?<log>.*)$/
          time_format %Y-%m-%dT%H:%M:%S.%N%:z
        </pattern>
      </parse>
    </source>

    # Detect exceptions in the log output and forward them as one log entry.
    <match raw.kubernetes.**>
      @id raw.kubernetes
      @type detect_exceptions
      remove_tag_prefix raw
      message log
      stream stream
      multiline_flush_interval 5
      max_bytes 500000
      max_lines 1000
    </match>

  output.conf: |-
    # Enriches records with Kubernetes metadata
    <filter kubernetes.**>
      @type kubernetes_metadata
    </filter>

    # Concatenate multi-line logs
    # <filter **>
    #   @type concat
    #   key log
    #   multiline_start_regexp /\d{1,2}:\d{1,2}:\d{1,2}/
    # </filter>

    <match **>
      @id elasticsearch
      @type elasticsearch
      @log_level info
      type_name fluentd
      include_tag_key true
      host elasticsearch-logging
      port 9200
      logstash_format true
      <buffer>
        @type file
        path /var/log/fluentd-buffers/kubernetes.system.buffer
        flush_mode interval
        retry_type exponential_backoff
        flush_thread_count 5
        flush_interval 5s
        retry_forever
        retry_max_interval 30
        chunk_limit_size 2M
        queue_limit_length 8
        overflow_action throw_exception
      </buffer>
    </match>

Expected Behavior

when use concatFilter, there is no Timeout Flush ERROR.

Your environment

  • OS version
    • kuberenetes 1.9.4
    • docker 17.09.1-ce
    • fluentd 1.2.4
  • boot log
2019-03-21 08:41:50 +0000 [info]: fluent/log.rb:322:info: starting fluentd-1.2.4 pid=57 ruby="2.3.3"
2019-03-21 08:41:50 +0000 [info]: fluent/log.rb:322:info: spawn command to main:  cmdline=["/usr/bin/ruby2.3", "-Eascii-8bit:ascii-8bit", "/usr/local/bin/fluentd", "-version", "--under-supervisor"]
2019-03-21 08:41:51 +0000 [info]: fluent/log.rb:322:info: gem 'fluent-plugin-concat' version '2.3.0'
2019-03-21 08:41:51 +0000 [info]: fluent/log.rb:322:info: gem 'fluent-plugin-detect-exceptions' version '0.0.11'
2019-03-21 08:41:51 +0000 [info]: fluent/log.rb:322:info: gem 'fluent-plugin-elasticsearch' version '2.11.11'
2019-03-21 08:41:51 +0000 [info]: fluent/log.rb:322:info: gem 'fluent-plugin-kubernetes_metadata_filter' version '2.0.0'
2019-03-21 08:41:51 +0000 [info]: fluent/log.rb:322:info: gem 'fluent-plugin-multi-format-parser' version '1.0.0'
2019-03-21 08:41:51 +0000 [info]: fluent/log.rb:322:info: gem 'fluent-plugin-prometheus' version '1.0.1'
2019-03-21 08:41:51 +0000 [info]: fluent/log.rb:322:info: gem 'fluent-plugin-systemd' version '1.0.1'
2019-03-21 08:41:51 +0000 [info]: fluent/log.rb:322:info: gem 'fluentd' version '1.2.4'
2019-03-21 08:41:51 +0000 [info]: fluent/log.rb:322:info: adding match pattern="fluent.**" type="null"
2019-03-21 08:41:51 +0000 [info]: fluent/log.rb:322:info: adding match pattern="raw.kubernetes.**" type="detect_exceptions"
2019-03-21 08:41:51 +0000 [info]: fluent/log.rb:322:info: adding filter pattern="kubernetes.**" type="kubernetes_metadata"
  • fluent-gem list
activesupport (5.2.1)
addressable (2.5.2)
bigdecimal (1.2.8)
concurrent-ruby (1.1.3)
cool.io (1.5.3)
did_you_mean (1.0.0)
dig_rb (1.0.1)
domain_name (0.5.20180417)
elasticsearch (6.1.0)
elasticsearch-api (6.1.0)
elasticsearch-transport (6.1.0)
excon (0.62.0)
faraday (0.15.4)
ffi (1.9.25)
fluent-plugin-concat (2.3.0)
fluent-plugin-detect-exceptions (0.0.11)
fluent-plugin-elasticsearch (2.11.11)
fluent-plugin-kubernetes_metadata_filter (2.0.0)
fluent-plugin-multi-format-parser (1.0.0)
fluent-plugin-prometheus (1.0.1)
fluent-plugin-systemd (1.0.1)
fluentd (1.2.4)
http (0.9.8)
http-cookie (1.0.3)
http-form_data (1.0.3)
http_parser.rb (0.6.0)
i18n (1.1.1)
io-console (0.4.5)
json (1.8.3)
kubeclient (1.1.4)
lru_redux (1.1.0)
mime-types (3.2.2)
mime-types-data (3.2018.0812)
minitest (5.11.3, 5.9.0)
msgpack (1.2.4)
multi_json (1.13.1)
multipart-post (2.0.0)
net-telnet (0.1.1)

Examples would be cool

Honestly I still have no idea what this is good for. I come here from the fluentd docker doc (chapter: "Additional Step 2: Concatenate multiple lines log messages").

The example given there with -e:4:in. I have no idea what it is or does. Please explain.

Add option to disable logging on timeouts

Problem

Some applications write supplementary information to log events
with indented lines. I would like to use the fluent-plugin-concat to join
these extra log lines with the initial, main line.
Not all log lines have such extra information.

Steps to replicate

I tried

    <filter **>
      @type concat
      key message
      multiline_start_regexp /^[^[:space:]]/
      flush_interval 2
      timeout_label @OUTPUT
    </filter>

and got lots of "Timeout flush: ..." log messages in fluentd.

Wanted Behavior

It would be nice if the timeout case could be optionally handled as a normal case without any extra info logging etc.

Sidenote: In the https://github.com/GoogleCloudPlatform/fluent-plugin-detect-exceptions plugin, this seems to be implemented for its special case.

Not able to concatenate multiline log separated in multiple events.

Problem

  • An xml event is tailed by fluentbit and sent as multiple events to fluentd.
  • Trying to concate them to single event using the concat plugin but it remains as multiple events
  • The requirement is to get the whole xml data between an open and close tag as single event.
  • The tag is <testsuite ..> {data} </testsuite>

Steps to replicate

Please try out the config below for the sample data given.

Concat Plugin Config

<filter testresults.**>
  @type concat
  key log
  multiline_start_regexp /^<testsuite/
  max_lines 4000
  log_level debug
  timeout_label @TEST
</filter>

Sample Data:

<?xml version="1.0" encoding="UTF-8"?>
<testsuite name="Test" tests="6" skipped="0" failures="0" errors="0" timestamp="2019-08-14T09:42:30" hostname="users" time="1.054">
 <properties/>
 <testcase name="missingLatitude()" classname="ControllerTest" time="0.438"/>
 <testcase name="incorrectly()" 
classname="ControllerTest" time="0.125"/>
 <system-out><<![CDATA[2019-08-14 15:12:23.812  INFO 30116]]></system-out>
 <system-err><![CDATA[]]></system-err>
</testsuite>

Expected Behavior

  • The whole data between <testsuite *> </testsuite> should be output as single event from fluentd
    Please comment if there are better ways of achieving this result

Your environment

  • OS version - Ubuntu 18.10
  • paste result of fluentd --version or td-agent --version - td-agent 1.4.2
  • plugin version >= 2.0.0

process event as normal after flush_interval

hi,
after flush_interval occurs the plugin generates error event which throws everything off.
In some cases - like multi-line log with long periods of inactivity - it would be extremely convenient instead to be able to configure plugin to act as like multiline_end_regexp just matched on flush_interval timeout - this would allow normal processing of the lines already in the buffer.

how to merge two records according one field value

My service log is like this, that one id will match two records
2018-10-10 unique_id_01 start
2018-10-10 unique_id_02 start
2018-10-10 unique_id_01 stop success
2018-10-10 unique_id_02 stop fail

And I want merge two records according to id value like this
2018-10-10 unique_id_01 start stop success
2018-10-10 unique_id_02 start stop fail

Since partial_value will have multi values, how can I use partial_key and partial_value to achieve this?

RuntimeError "can't add a new key into hash during iteration"

Problem

I am seeing occasional warnings with the following log output

2018-06-12 09:11:49 +0000 [warn]: #0 dump an error event: error_class=RuntimeError error="can't add a new key into hash during iteration" location="/fluentd/vendor/bundle/ruby/2.3.0/gems/fluent-plugin-concat-2.2.2/lib/fluent/plugin/filter_concat.rb:122:in `process'" tag="kubernetes.var.log.containers.REDACTED.log" time=2018-06-12 09:11:48.674985171 +0000 record={"log"=>"REDACTED", "stream"=>"stdout"}

It could look like some kind of time-related issue, as I am not seeing an overwhelming amount of these.

Your environment

$ uname -a
Linux fluentd-2wm7d 4.13.0-1018-azure #21-Ubuntu SMP Thu May 17 13:58:38 UTC 2018 x86_64 GNU/Linux
  • paste result of fluentd --version or td-agent --version
$ fluentd --version
fluentd 1.1.3
  • plugin version
    • paste boot log of fluentd or td-agent
    • paste result of fluent-gem list, td-agent-gem list or your Gemfile.lock
$ fluent-gem list
*** LOCAL GEMS ***

activesupport (5.2.0)
addressable (2.5.2)
azure-loganalytics-datacollector-api (0.1.2)
bigdecimal (1.2.8)
concurrent-ruby (1.0.5)
cool.io (1.5.3)
dig_rb (1.0.1)
domain_name (0.5.20180417)
elasticsearch (6.0.2)
elasticsearch-api (6.0.2)
elasticsearch-transport (6.0.2)
excon (0.62.0)
faraday (0.15.0)
ffi (1.9.23)
fluent-plugin-azure-loganalytics (0.3.1)
fluent-plugin-concat (2.2.2)
fluent-plugin-elasticsearch (2.9.2)
fluent-plugin-kubernetes_metadata_filter (2.1.2)
fluent-plugin-record-modifier (1.0.2)
fluent-plugin-systemd (1.0.0)
fluentd (1.1.3)
http (0.9.8)
http-cookie (1.0.3)
http-form_data (1.0.3)
http_parser.rb (0.6.0)
i18n (1.0.1)
io-console (0.4.5)
json (1.8.3)
kubeclient (1.1.4)
lru_redux (1.1.0)
mime-types (3.1)
mime-types-data (3.2016.0521)
minitest (5.11.3)
msgpack (1.2.4)
multi_json (1.13.1)
multipart-post (2.0.0)
netrc (0.11.0)
oj (3.5.1)
psych (2.1.0)
public_suffix (3.0.2)
rdoc (4.2.1)
recursive-open-struct (1.0.0)
rest-client (2.0.2)
serverengine (2.0.6)
sigdump (0.2.4)
strptime (0.2.3)
systemd-journal (1.3.1)
thread_safe (0.3.6)
tzinfo (1.2.5)
tzinfo-data (1.2018.5)
unf (0.1.4)
unf_ext (0.0.7.5)
yajl-ruby (1.4.0)

Getting "dump an error event: error_class=Fluent::Plugin::ConcatFilter::TimeoutError"

Hi team,

I'm using concat plugin v2.1.0 for my FluentD container, the config is as follow:

<source>
  type forward
  port 24224
  bind 0.0.0.0
</source>

<filter *.*>
  @type concat
  key log
  separator ""
  stream_identity_key container_id
  multiline_start_regexp /^---SL---/
  multiline_end_regexp /^---EL---&/
  flush_interval 10
</filter>

<filter *.*>
  @type record_transformer

  #this allows ruby syntax in the below conversion
  enable_ruby true
  <record>
    log ${record["log"] != nil ? record["log"].sub('---SL---','') : ''}
  </record>
</filter>

<filter *.*>
  @type record_transformer

  #this allows ruby syntax in the below conversion
  enable_ruby true
  <record>
    log ${record["log"] != nil ? record["log"].sub('---EL---&','') : ''}
  </record>
</filter>

further processing

................................................

So my log event will indicate its start point with ---SL--- and endpoint with ---EL---&.
There's a Java app running in another container and use fluent logging driver.

Problem is I'm getting timeout flush for some random event, e.g:

2017-09-26 07:27:16 +0000 [warn]: #0 fluent/log.rb:336:call: dump an error event: error_class=Fluent::Plugin::ConcatFilter::TimeoutError error="Timeout flush: docker.5eabe1bb5d52:5eabe1bb5d52251a978270f141ba2657c1e2ac3a5febfe081e48f0039aae7646" tag="docker.5eabe1bb5d52" time=#<Fluent::EventTime:0x007fd9c861a9c8 @sec=1506410836, @nsec=182951490> record={"container_name"=>"/gfast-sim-id-1-1", "source"=>"stdout", "log"=>"---SL---{\"date\":1506410826922,\"level\":\"INFO\",\"thread\":\"TestANV.1-1-1-Thread-1\",\"category\":\"com.alcatel.netconf.simulator.fwk.DeviceServer\",\"message\":\"TestANV.1-1-1 says hello with : NetconfClientInfo{username\\u003d\\u0027TLS-CLIENT\\u0027, sessionId\\u003d159, m_remoteHost\\u003d\\u0027anv\\u0027, m_remotePort\\u003d\\u00276524\\u0027}\"}---EL---&\r", "container_id"=>"5eabe1bb5d52251a978270f141ba2657c1e2ac3a5febfe081e48f0039aae7646"}

As you can see the log event is complete, we don't wait for pieces of that event and concat. So I cannot understand why the timeout happened.
It occurs quite randomly, some time with TestANV.1-1-1, some time with TestANV.1-1-2 (I have 5 such entities).

Can some one please help?

Add configuration option to specify multiline_match_regexp instead of _end_regexp or flush interval

I have a high volume log (fast), with single and multiline messages coming in. Multiline messages are always together, however (thankfully). Right now, I'm using concat like:

<filter sp.*>
  @type concat
  key msg
  multiline_start_regexp /Multiline_Start/
    flush_interval 1s
    timeout_label "@NORMAL"
</filter>

and it works. However, if a normal, non-multiline message comes in before the flush_interval, it is also appended, obviously not ideal.
I would LOVE to use the _end_regexp, but my multiline message don't have an obvious end (all but the first line is indented with a tab, however).

So I was hoping to get something like multiline_match_regexp /^\t.
When used with the start_regexp, the concat would consist of the first message (start_regexp), and all subseqent lines that match the multiline_match_regexp only. Once a line comes in that does not match the multiline_match_regexp, flush.

This would help me avoid gathering the wrong lines together with a flush interval, and properly detect the termination of my multiline (whenever a line comes in that doesn't match the multiline_match_regexp)

it doesn't seem like that much of a stretch...?

also thanks for the great plugin!

Timeout attempting to join multiline k8s logs

Problem

In my kubernetes system, I'm attempting to use concat to join logs that docker has split over multiple lines due to its 16KB line limit. However the example config to do this is giving me timeout errors and not processing the logs.

Steps to replicate

According to the README, I should be able to do something like this:

<filter kubernetes.**>
  @type concat
  key log
  stream_identity_key $.docker.container_id
  multiline_end_regexp /\\n$/
  separator ""
</filter>

to concatenate such lines. However when I do that, I'm getting (for example):

2019-07-30 01:48:26 +0000 [warn]: #0 dump an error event: error_class=Fluent::Plugin::ConcatFilter::TimeoutError error="Timeout flush: kubernetes.var.log.containers.cadvisor-v8ft7_kube-system_cadvisor-5d3d58c7dad831778b76a3398580a90169ddf5d22d8f231872390b7a3a619942.log:" location=nil tag="kubernetes.var.log.containers.cadvisor-v8ft7_kube-system_cadvisor-5d3d58c7dad831778b76a3398580a90169ddf5d22d8f231872390b7a3a619942.log" time=2019-07-30 01:48:26.120242540 +0000 record={"log"=>"W0730 01:47:22.610725       1 container.go:422] Failed to get RecentStats(\"/system.slice/run-12021.scope\") while determining the next housekeeping: unable to find data in memory cache\n", "stream"=>"stderr", "docker"=>{"container_id"=>"5d3d58c7dad831778b76a3398580a90169ddf5d22d8f231872390b7a3a619942"}, "kubernetes"=>{"container_name"=>"cadvisor", <BUNCH OF KUBERNETES STUFF REMOVED>}}

Note the presence of the "\n" at the end of the log field.

Expected Behavior

If my understanding's correct (which it's almost certainly not) that multiline_end_regex parameter should cause any line with \n (as a string, not a literal newline) at the end of the the log field to end any multiline handling and allow processing of the line to continue.

I've tried a bunch of small variations on the theme (leaving out stream_identity_key as I've seen in some examples, some slightly modified regexes etc) but to no avail.

Your environment

  • Amazon Linux
  • fluentd 1.4.2
  • concat version 2.3.0

Boot log

2019-07-30 02:24:54 +0000 [info]: parsing config file is succeeded path="/fluentd/etc/fluentd.conf"
2019-07-30 02:24:54 +0000 [info]: adding rewrite_tag_filter rule: SYSTEMD_UNIT [#<Fluent::PluginHelper::RecordAccessor::Accessor:0x00007f0c15b0fd60 @keys="SYSTEMD_UNIT">, /^(.+).service$/, "", "systemd.$1"]
2019-07-30 02:24:54 +0000 [info]: adding rewrite_tag_filter rule: SYSTEMD_UNIT [#<Fluent::PluginHelper::RecordAccessor::Accessor:0x00007f0c15b0e618 @keys="SYSTEMD_UNIT">, /!^(.+).service$/, "", "systemd.unmatched"]
2019-07-30 02:24:54 +0000 [info]: adding rewrite_tag_filter rule: class [#<Fluent::PluginHelper::RecordAccessor::Accessor:0x00007f0c1232ae68 @keys="class">, /audit/, "", "audit.${tag}"]
2019-07-30 02:24:54 +0000 [info]: adding rewrite_tag_filter rule: class [#<Fluent::PluginHelper::RecordAccessor::Accessor:0x00007f0c12333c48 @keys="class">, /audit/, "!", "standard.${tag}"]
2019-07-30 02:24:55 +0000 [warn]: both of Plugin @id and path for <storage> are not specified. Using on-memory store.
2019-07-30 02:24:55 +0000 [info]: using configuration file: <ROOT>
  <match fluent.**>
    @type null
  </match>
  <source>
    @type http
    port 9880
    bind "0.0.0.0"
  </source>
  <source>
    @type monitor_agent
    bind "0.0.0.0"
    port 24220
    tag "fluentd.monitor.metrics"
  </source>
  <source>
    @type prometheus
    bind "0.0.0.0"
    port 24231
    metrics_path "/metrics"
  </source>
  <source>
    @type prometheus_monitor
    interval 5
  </source>
  <source>
    @type prometheus_output_monitor
    interval 5
  </source>
  <source>
    @type prometheus_tail_monitor
    interval 5
  </source>
  <source>
    @type systemd
    read_from_head true
    tag "systemd"
    <entry>
      fields_strip_underscores true
    </entry>
  </source>
  <source>
    @type tail
    path "/var/log/containers/*.log"
    pos_file "/var/log/fluentd-containers.log.pos"
    tag "kubernetes.*"
    read_from_head true
    <parse>
      @type "json"
    </parse>
  </source>
  <match systemd>
    @type rewrite_tag_filter
    <rule>
      key "SYSTEMD_UNIT"
      pattern ^(.+).service$
      tag "systemd.$1"
    </rule>
    <rule>
      key "SYSTEMD_UNIT"
      pattern !^(.+).service$
      tag "systemd.unmatched"
    </rule>
  </match>
  <filter systemd.kubelet>
    @type parser
    format kubernetes
    reserve_data true
    key_name "MESSAGE"
    emit_invalid_record_to_error false
    <parse>
      @type kubernetes
      expression /^(?<severity>\w)(?<time>\d{4} [^\s]*)\s+(?<pid>\d+)\s+(?<source>[^ \]]+)\] (?<message>.*)/m
      time_format %m%d %H:%M:%S.%N
    </parse>
  </filter>
  <filter systemd.docker>
    @type parser
    format /^time="(?<time>[^)]*)" level=(?<severity>[^ ]*) msg="(?<message>[^"]*)"( err="(?<error>[^"]*)")?( statusCode=($<status_code>\d+))?/
    reserve_data true
    key_name "MESSAGE"
    emit_invalid_record_to_error false
    <parse>
      @type regexp
      expression ^time="(?<time>[^)]*)" level=(?<severity>[^ ]*) msg="(?<message>[^"]*)"( err="(?<error>[^"]*)")?( statusCode=($<status_code>\d+))?
    </parse>
  </filter>
  <filter systemd.**>
    @type grep
    <exclude>
      key "SYSTEMD_UNIT"
      pattern (sshd@.*\.service)
    </exclude>
  </filter>
  <filter kubernetes.**>
    @type kubernetes_metadata
    verify_ssl false
  </filter>
  <filter kubernetes.**>
    @type concat
    key "log"
    stream_identity_key "$.docker.container_id"
    multiline_end_regexp "/\\\\n$/"
    separator ""
  </filter>
  <filter kubernetes.**>
    @type parser
    key_name "log"
    reserve_data true
    remove_key_name_field true
    <parse>
      @type "multi_format"
      <pattern>
        format json
      </pattern>
      <pattern>
        format none
      </pattern>
    </parse>
  </filter>
  <match kubernetes.**>
    @type rewrite_tag_filter
    <rule>
      key "class"
      pattern audit
      tag "audit.${tag}"
    </rule>
    <rule>
      key "class"
      pattern audit
      tag "standard.${tag}"
      invert true
    </rule>
  </match>
  <filter audit.**>
    @type parser
    key_name "msg"
    reserve_data true
    remove_key_name_field true
    hash_value_field "audit"
    <parse>
      @type "json"
    </parse>
  </filter>
  <match **>
    @type loggly_buffered
    loggly_url "https://logs-01.loggly.com/bulk/<SECRET KEY>"
    buffer_chunk_limit 2M
    buffer_queue_limit 32
    flush_interval 10s
    max_retry_wait 30
    disable_retry_limit 
    num_threads 8
    <buffer>
      flush_mode interval
      retry_type exponential_backoff
      flush_thread_count 8
      flush_interval 10s
      retry_forever 
      retry_max_interval 30
      chunk_limit_size 2M
      queue_limit_length 32
    </buffer>
  </match>
</ROOT>
2019-07-30 02:24:55 +0000 [info]: starting fluentd-1.4.2 pid=1 ruby="2.6.3"
2019-07-30 02:24:55 +0000 [info]: spawn command to main:  cmdline=["/usr/local/bin/ruby", "-Eascii-8bit:ascii-8bit", "/fluentd/vendor/bundle/ruby/2.6.0/bin/fluentd", "-c", "/fluentd/etc/fluentd.conf", "-p", "/fluentd/plugins", "--under-supervisor"]
2019-07-30 02:24:55 +0000 [info]: gem 'fluent-plugin-concat' version '2.3.0'
2019-07-30 02:24:55 +0000 [info]: gem 'fluent-plugin-grok-parser' version '2.5.1'
2019-07-30 02:24:55 +0000 [info]: gem 'fluent-plugin-json-in-json-2' version '1.0.2'
2019-07-30 02:24:55 +0000 [info]: gem 'fluent-plugin-kubernetes_metadata_filter' version '2.1.6'
2019-07-30 02:24:55 +0000 [info]: gem 'fluent-plugin-loggly' version '0.0.9'
2019-07-30 02:24:55 +0000 [info]: gem 'fluent-plugin-multi-format-parser' version '1.0.0'
2019-07-30 02:24:55 +0000 [info]: gem 'fluent-plugin-prometheus' version '1.3.0'
2019-07-30 02:24:55 +0000 [info]: gem 'fluent-plugin-rewrite-tag-filter' version '2.1.1'
2019-07-30 02:24:55 +0000 [info]: gem 'fluent-plugin-systemd' version '1.0.2'
2019-07-30 02:24:55 +0000 [info]: gem 'fluentd' version '1.4.2'
2019-07-30 02:24:55 +0000 [info]: adding match pattern="fluent.**" type="null"
2019-07-30 02:24:55 +0000 [info]: adding match pattern="systemd" type="rewrite_tag_filter"
2019-07-30 02:24:55 +0000 [info]: #0 adding rewrite_tag_filter rule: SYSTEMD_UNIT [#<Fluent::PluginHelper::RecordAccessor::Accessor:0x00007f7098314da8 @keys="SYSTEMD_UNIT">, /^(.+).service$/, "", "systemd.$1"]
2019-07-30 02:24:55 +0000 [info]: #0 adding rewrite_tag_filter rule: SYSTEMD_UNIT [#<Fluent::PluginHelper::RecordAccessor::Accessor:0x00007f709831efb0 @keys="SYSTEMD_UNIT">, /!^(.+).service$/, "", "systemd.unmatched"]
2019-07-30 02:24:55 +0000 [info]: adding filter pattern="systemd.kubelet" type="parser"
2019-07-30 02:24:55 +0000 [info]: adding filter pattern="systemd.docker" type="parser"
2019-07-30 02:24:55 +0000 [info]: adding filter pattern="systemd.**" type="grep"
2019-07-30 02:24:55 +0000 [info]: adding filter pattern="kubernetes.**" type="kubernetes_metadata"
2019-07-30 02:24:55 +0000 [info]: adding filter pattern="kubernetes.**" type="concat"
2019-07-30 02:24:55 +0000 [info]: adding filter pattern="kubernetes.**" type="parser"
2019-07-30 02:24:55 +0000 [info]: adding match pattern="kubernetes.**" type="rewrite_tag_filter"
2019-07-30 02:24:55 +0000 [info]: #0 adding rewrite_tag_filter rule: class [#<Fluent::PluginHelper::RecordAccessor::Accessor:0x00007f70949c23e8 @keys="class">, /audit/, "", "audit.${tag}"]
2019-07-30 02:24:55 +0000 [info]: #0 adding rewrite_tag_filter rule: class [#<Fluent::PluginHelper::RecordAccessor::Accessor:0x00007f70949c0a48 @keys="class">, /audit/, "!", "standard.${tag}"]
2019-07-30 02:24:55 +0000 [info]: adding filter pattern="audit.**" type="parser"
2019-07-30 02:24:55 +0000 [info]: adding match pattern="**" type="loggly_buffered"
2019-07-30 02:24:55 +0000 [info]: adding source type="http"
2019-07-30 02:24:55 +0000 [info]: adding source type="monitor_agent"
2019-07-30 02:24:55 +0000 [info]: adding source type="prometheus"
2019-07-30 02:24:55 +0000 [info]: adding source type="prometheus_monitor"
2019-07-30 02:24:55 +0000 [info]: adding source type="prometheus_output_monitor"
2019-07-30 02:24:55 +0000 [info]: adding source type="prometheus_tail_monitor"
2019-07-30 02:24:55 +0000 [info]: adding source type="systemd"
2019-07-30 02:24:55 +0000 [warn]: #0 both of Plugin @id and path for <storage> are not specified. Using on-memory store.
2019-07-30 02:24:55 +0000 [info]: adding source type="tail"
2019-07-30 02:24:55 +0000 [info]: #0 starting fluentd worker pid=14 ppid=1 worker=0
2019-07-30 02:24:56 +0000 [info]: #0 following tail of /var/log/containers/kube-proxy-qkxxs_kube-system_kube-proxy-02fe345165eb3413028ed59645493e4ee9e4c34004a76b44702f5e2b2d8e79ff.log
<...MORE CONTAINER LOGS...>
2019-07-30 02:24:56 +0000 [info]: #0 disable filter chain optimization because [Fluent::Plugin::KubernetesMetadataFilter, Fluent::Plugin::ConcatFilter] uses `#filter_stream` method.
<...MORE CONTAINER LOGS...>
2019-07-30 02:24:56 +0000 [info]: #0 disable filter chain optimization because [Fluent::Plugin::KubernetesMetadataFilter, Fluent::Plugin::ConcatFilter] uses `#filter_stream` method.
2019-07-30 02:24:56 +0000 [info]: #0 following tail of /var/log/containers/fluentd-brdcx_logging_fluentd-aee2d390b8083095142a2516941910d8939af8127bcba4709dc2dfd6c86423f5.log
2019-07-30 02:24:56 +0000 [info]: #0 disable filter chain optimization because [Fluent::Plugin::KubernetesMetadataFilter, Fluent::Plugin::ConcatFilter] uses `#filter_stream` method.
<...MORE CONTAINER LOGS...>
2019-07-30 02:24:56 +0000 [info]: #0 disable filter chain optimization because [Fluent::Plugin::KubernetesMetadataFilter, Fluent::Plugin::ConcatFilter] uses `#filter_stream` method.
<...MORE CONTAINER LOGS...>
2019-07-30 02:24:56 +0000 [info]: #0 fluentd worker is now running worker=0

fluent-gem list output

*** LOCAL GEMS ***

activesupport (5.2.3)
addressable (2.6.0)
bigdecimal (default: 1.4.1)
bundler (default: 1.17.2, 1.16.2)
cmath (default: 1.0.0)
concurrent-ruby (1.1.5)
cool.io (1.5.4)
csv (default: 3.0.9)
date (default: 2.0.0)
dbm (default: 1.0.0)
dig_rb (1.0.1)
domain_name (0.5.20180417)
e2mmap (default: 0.1.0)
etc (default: 1.0.1)
fcntl (default: 1.0.0)
ffi (1.11.1)
fiddle (default: 1.0.0)
fileutils (default: 1.1.0)
fluent-config-regexp-type (1.0.0)
fluent-plugin-concat (2.3.0)
fluent-plugin-grok-parser (2.5.1)
fluent-plugin-json-in-json-2 (1.0.2)
fluent-plugin-kubernetes_metadata_filter (2.1.6)
fluent-plugin-loggly (0.0.9)
fluent-plugin-multi-format-parser (1.0.0)
fluent-plugin-prometheus (1.3.0)
fluent-plugin-rewrite-tag-filter (2.1.1)
fluent-plugin-systemd (1.0.2)
fluentd (1.4.2)
forwardable (default: 1.2.0)
gdbm (default: 2.0.0)
http (0.9.8)
http-cookie (1.0.3)
http-form_data (1.0.3)
http_parser.rb (0.6.0)
i18n (1.6.0)
io-console (default: 0.4.7)
ipaddr (default: 1.2.2)
irb (default: 1.0.0)
json (default: 2.1.0)
kubeclient (1.1.4)
logger (default: 1.3.0)
lru_redux (1.1.0)
matrix (default: 0.1.0)
mime-types (3.2.2)
mime-types-data (3.2019.0331)
minitest (5.11.3)
msgpack (1.3.0)
mutex_m (default: 0.1.0)
net-http-persistent (2.9.4)
netrc (0.11.0)
oj (3.5.1)
openssl (default: 2.1.2)
ostruct (default: 0.1.0)
prime (default: 0.1.0)
prometheus-client (0.9.0)
psych (default: 3.1.0)
public_suffix (3.1.1)
quantile (0.2.1)
rdoc (default: 6.1.0)
recursive-open-struct (1.0.0)
rest-client (2.0.2)
rexml (default: 3.1.9)
rss (default: 0.2.7)
scanf (default: 1.0.0)
sdbm (default: 1.0.0)
serverengine (2.1.1)
shell (default: 0.7)
sigdump (0.2.4)
stringio (default: 0.0.2)
strptime (0.2.3)
strscan (default: 1.0.0)
sync (default: 0.5.0)
systemd-journal (1.3.3)
thread_safe (0.3.6)
thwait (default: 0.1.0)
tracer (default: 0.1.0)
tzinfo (1.2.5)
tzinfo-data (1.2019.2)
unf (0.1.4)
unf_ext (0.0.7.6)
webrick (default: 1.4.2)
yajl-ruby (1.4.1)
zlib (default: 1.0.0)

Docker buffering documentation

Problem

I think this project is going to have a popular use-case as more people move to Docker 1.13 (Kubernetes just upgraded), which added 16kb line limits, breaking logs up if they exceed that limit. Many people log "single-line json" in Docker so the end result here is that a "\n" signals the end if the real line.

Would it be possible to get a working solution for this? Ideally documented but I'd be happy with one in an issue here too. :)

Using the standard in the test files:

messages = [
        { "host" => "example.com", "message" => "{\"key1\": \"value1\",\"key2\": \"value2\"}\n" },
        { "host" => "example.com", "message" => "{\"key3\": \"value3\",\"key4\": \"value4\"," },
        { "host" => "example.com", "message" => "\"key5\": \"value5\",\"key6\": \"value6\"," },
        { "host" => "example.com", "message" => "\"key7\": \"value7\",\"key8\": \"value8\"}\n" },
        { "host" => "example.com", "message" => "{\"key9\": \"value9\",\"key0\": \"value0\"," },
        { "host" => "example.com", "message" => "\"key1\": \"value1\",\"key2\": \"value2\"}\n" },
]

The expected output would be something like...

{ "host" => "example.com","message" => "{\"key1\": \"value1\",\"key2\": \"value2\"}\n" }
{ "host" => "example.com","message" => "{\"key3\": \"value3\",\"key4\": \"value4\",\"key5\": \"value5\",\"key6\": \"value6\",\"key7\": \"value7\",\"key8\": \"value8\"}\n" },
{ "host" => "example.com","message" => "{\"key9\": \"value9\",\"key0\": \"value0\",\"key1\": \"value1\",\"key2\": \"value2\"}\n" },

This should be fairly straightforward but I get weird results where the first grouping comes out correctly but every line after that is on it's own and not concatenated. I imagine I'm missing one of the other regex attributes that is needed.

For what it's worth, I'm using this for regex start:
/.*(?<!\\n)/

regexp for multiline

hello i've got multiline expresion like that multiline_start_regexp /(johnny)/
how can i change it for case like when i will get Stefan, he will stop going after johnny and starts following stefan and then he will back to follow johnny until next stefan
example for this log

johnny 
a
b
stephan
a
b
stephan
a
johnny
b
johnny
a

i will get log

1 jonny a b 
2 stephan a b
3 stephan a johnny b
4 johnny a 

Concat does not work correctly for partial messages

Problem

Partial messages are wrong concatenated. Every partial message part (also the last part) has the flag partial_message = true. So it is impossible to concat right partial message even when followed messages are to long and are partitioned.
It is also not enough to have only one flag, when messages arrive concurrent.

Steps to replicate

docker run --log-driver=fluentd -it perl perl -e 'print "START" ;print "X" x 24000; print "END\n"'

This will log in fluentd for every part of partial message the boolean partial_message as true.

Expected Behavior

The plugin should use the partial log meta data PartialLogMetaData to concat log messages. The current version of docker (moby) does not fill this metadata as you can see here. So I created an issue to extends this metadata.

Is this the correct way of detecting multiline lines?

Just wanted to make sure if i understand the multiline detection correctly:

Does the config below correctly detect the multilines:

<filter *.docker.*>
  @type concat
  key log
  stream_identity_key container_id
  multiline_start_regexp /^\S+/
  flush_interval 1m
</filter>

So if a line has non whitespace followed by other stuff then this is related to the previous line?

If i have loglines like:

ERROR org.apache.tomcat.jdbc.pool.ConnectionPool - Unable to create initial connections of pool.
org.postgresql.util.PSQLException: The connection attempt failed.
    at org.postgresql.core.v3.ConnectionFactoryImpl.openConnectionImpl(ConnectionFactoryImpl.java:249)
    at org.postgresql.core.ConnectionFactory.openConnection(ConnectionFactory.java:66)
    at org.postgresql.jdbc2.AbstractJdbc2Connection.<init>(AbstractJdbc2Connection.java:127)
Caused by: java.net.UnknownHostException: dummy-db
    at java.net.AbstractPlainSocketImpl.connect(AbstractPlainSocketImpl.java:184)
    at java.net.SocksSocketImpl.connect(SocksSocketImpl.java:392)
    at java.net.Socket.connect(Socket.java:589)
Caused by: java.lang.reflect.InvocationTargetException
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:497)
    at org.springframework.boot.loader.MainMethodRunner.run(MainMethodRunner.java:54)
    ... 1 more

undefined method emit_events

Hi,

On v2.0, I got fllowing errors :

2016-11-04 09:31:38 +0000 [warn]: send an error event stream to @ERROR: error_class=NoMethodError error="undefined method `emit_events' for #<Fluent::Plugin::ConcatFilter:000000028e5600>" tag="kubernetes.specific.tools.var.log.containers.webapp-3286074076-6wpmw_default_webapp-2a00351541beb12781469252c8494447fba3b94bd9b6993c2509447fd9a85420.log"
2016-11-04 09:31:44 +0000 [warn]: send an error event stream to @ERROR: error_class=NoMethodError error="undefined method `emit_events' for #<Fluent::Plugin::ConcatFilter:000000028e5600>" tag="kubernetes.specific.tools.var.log.containers.webapp-3286074076-y3t9u_default_webapp-dd822bed80763b304a70f3e9a2e853530cf9b3df5070f208b989595cb0677110.log"
2016-11-04 09:31:44 +0000 [warn]: send an error event stream to @ERROR: error_class=NoMethodError error="undefined method `emit_events' for #<Fluent::Plugin::ConcatFilter:000000028e5600>" tag="kubernetes.specific.tools.var.log.containers.webapp-3286074076-6wpmw_default_webapp-2a00351541beb12781469252c8494447fba3b94bd9b6993c2509447fd9a85420.log"
2016-11-04 09:31:45 +0000 [warn]: send an error event stream to @ERROR: error_class=NoMethodError error="undefined method `emit_events' for #<Fluent::Plugin::ConcatFilter:000000028e5600>" tag="kubernetes.specific.tools.var.log.containers.webapp-3286074076-wd81i_default_webapp-6360b9f84bfcff0184ce7879577ce605cf6c51dae3752657bdf749e95a82520e.log"
2016-11-04 09:31:47 +0000 [warn]: send an error event stream to @ERROR: error_class=NoMethodError error="undefined method `emit_events' for #<Fluent::Plugin::ConcatFilter:000000028e5600>" tag="kubernetes.specific.tools.var.log.containers.webapp-3286074076-6wpmw_default_webapp-2a00351541beb12781469252c8494447fba3b94bd9b6993c2509447fd9a85420.log"

My configuration is :

2016-11-04 09:27:29 +0000 [info]: starting fluentd-0.14.8 without supervision
2016-11-04 09:27:29 +0000 [info]: gem 'fluent-plugin-concat' version '2.0.0'
2016-11-04 09:27:29 +0000 [info]: gem 'fluent-plugin-elasticsearch' version '1.8.0'
2016-11-04 09:27:29 +0000 [info]: gem 'fluent-plugin-kubernetes_metadata_filter' version '0.26.2'
2016-11-04 09:27:29 +0000 [info]: gem 'fluent-plugin-parser' version '0.6.1'
2016-11-04 09:27:29 +0000 [info]: gem 'fluent-plugin-prometheus' version '0.2.1'
2016-11-04 09:27:29 +0000 [info]: gem 'fluentd' version '0.14.1'
2016-11-04 09:27:29 +0000 [info]: gem 'fluentd' version '0.12.29'

thanks

Concat problem....

Problem

This is a message:

2018-11-21 15:41:39,418 ERROR [controllers.FALoggingController:postLog:100] Error logging
org.springframework.dao.DataIntegrityViolationException: could not execute batch; SQL [insert into falog (comment, data, deviceId, level, message, tag, timestamp, uploadId, userId, id) values (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)]; nested exception is org.hibernate.exception.DataException: could not execute batch
	at org.springframework.orm.jpa.vendor.HibernateJpaDialect.convertHibernateAccessException(HibernateJpaDialect.java:282)
	at org.springframework.orm.jpa.vendor.HibernateJpaDialect.translateExceptionIfPossible(HibernateJpaDialect.java:244)
	at org.springframework.orm.jpa.JpaTransactionManager.doCommit(JpaTransactionManager.java:521)
org.springframework.web.servlet.mvc.method.AbstractHandlerMethodAdapter.handle(AbstractHandlerMethodAdapter.java:85)
	at org.springframework.web.servlet.DispatcherServlet.doDispatch(DispatcherServlet.java:963)
	at org.springframework.web.servlet.DispatcherServlet.doService(DispatcherServlet.java:897)
	at 
Caused by: org.hibernate.exception.DataException: could not execute batch
	org.hibernate.engine.jdbc.batch.internal.BatchingBatch.performExecution(BatchingBatch.java:110)
	... 76 more
Caused by: org.postgresql.util.PSQLException: ERROR: value too long for type character varying(255)
	at org.postgresql.core.v3.QueryExecutorImpl.receiveErrorResponse(QueryExecutorImpl.java:2412)
	at org.postgresql.core.v3.QueryExecutorImpl.processResults(QueryExecutorImpl.java:2125)
	... 80 more
2018-11-21 15:41:58,578 INFO  [works.FAWorksManager:getWorks:251] 90046571 -> Download works from server: 0 documents and 0 media details 

And my config for td-agent:

<filter apache.flexiapp.pa>
	@type concat
	key message
	key_name message
	stream_identity_key container_id

	multiline_start_regexp /^[0-9]{4}-[0-9]{2}-[0-9]{2} [0-9]{2}:[0-9]{2}:[0-9]{2},[0-9]{3} .*/
	continue_line_regexp /^\s+at.*/
	continue_line_regexp /^\t+at.*/
	continue_line_regexp /^Caused by: .*/
	#continue_line_regexp /^... [0-9]+ more/
	#multiline_end_regexp /^[0-9]{4}-[0-9]{2}-[0-9]{2} [0-9]{2}:[0-9]{2}:[0-9]{2},[0-9]{3}/
	multiline_end_regexp /^... [0-9]+ more/

</filter>

Using partial_key incorrectly to concatnate docker logs

Problem

Hi, I'm having problem trying to concatenate docker logs splitted to several parts (15 logs). The way I use the partial_key and partial_value is from the last splitted log. I'm wondering if the partial_key (partial_message) is suppose to be in every splitted log? Which splitted log should the partial_key be use?

Steps to replicate

Provide example config and message

Config File,

    <filter kubernetes.var.log.containers.my-go-api**>
      @type concat
      key log
      separator "/"
      partial_key requestURI
      partial_value "/api/v1/mt/epcis"
      use_first_timestamp true
    </filter>

1st Log,

{"log":"{\"_source\":\"handlers/handlers.go:107\",\"events\":[{\"BatchNumber\":\"11179709\",\"BizStep\":\"urn:epcglobal:cbv:bizstep:commissioning\",\"Disposition\":\"urn:epcglobal:cbv:disp:active\",\"EpcList\":{\"EPCID\":[\"(01)05057976000942....
.....
"stream":"stderr","time":"2019-11-12T16:46:24.399560431Z"}

15th Log,

{"log":"01696\\u003c/epc\\u003e\\u003cepc\\u003e(01)05057976000942(21)0000000001697\\u003c/epc...
.....
"requestURI\":\"/api/v1/mt/epcis\",\"severity\":\"info\",\"time\":\"2019-11-12T16:46:24.396915111Z\",\"timestamp\":\"2019-11-12T16:46:24.39690511Z\"}\n","stream":"stderr","time":"2019-11-12T16:46:24.399560431Z"}

Expected Behavior

Concatenate the splitted logs.

Your environment

  • OS version
    3.10.0-957.1.3.el7.x86_64
  • paste result of fluentd --version or td-agent --version
    1.6.3
  • plugin version
    2.4.0
    • paste boot log of fluentd or td-agent
    • paste result of fluent-gem list, td-agent-gem list or your Gemfile.lock
2019-11-12 23:27:53 +0000 [info]: starting fluentd-1.6.3 pid=1268 ruby="2.3.3"
2019-11-12 23:27:53 +0000 [info]: spawn command to main:  cmdline=["/usr/bin/ruby2.3", "-Eascii-8bit:ascii-8bit", "/usr/local/bin/fluentd", "--under-supervisor"]
2019-11-12 23:27:54 +0000 [info]: gem 'fluent-plugin-concat' version '2.4.0'
2019-11-12 23:27:54 +0000 [info]: gem 'fluent-plugin-detect-exceptions' version '0.0.12'
2019-11-12 23:27:54 +0000 [info]: gem 'fluent-plugin-elasticsearch' version '3.5.4'
2019-11-12 23:27:54 +0000 [info]: gem 'fluent-plugin-kubernetes_metadata_filter' version '2.2.0'
2019-11-12 23:27:54 +0000 [info]: gem 'fluent-plugin-multi-format-parser' version '1.0.0'
2019-11-12 23:27:54 +0000 [info]: gem 'fluent-plugin-prometheus' version '1.4.0'
2019-11-12 23:27:54 +0000 [info]: gem 'fluent-plugin-systemd' version '1.0.2'
2019-11-12 23:27:54 +0000 [info]: gem 'fluentd' version '1.6.3'
2019-11-12 23:27:54 +0000 [info]: adding match pattern="fluent.**" type="null"
2019-11-12 23:27:54 +0000 [info]: adding filter pattern="kubernetes.**" type="kubernetes_metadata"
2019-11-12 23:27:55 +0000 [info]: adding filter pattern="kubernetes.**" type="parser"
2019-11-12 23:27:55 +0000 [info]: adding filter pattern="kubernetes.**" type="grep"
2019-11-12 23:27:55 +0000 [info]: adding filter pattern="kubernetes.var.log.containers.my-go-api**" type="concat"
2019-11-12 23:27:55 +0000 [info]: adding match pattern="**" type="elasticsearch"
2019-11-12 23:27:56 +0000 [info]: adding match pattern="kubernetes.var.log.containers.fluentd**" type="null"
2019-11-12 23:27:56 +0000 [info]: adding source type="tail"
2019-11-12 23:27:56 +0000 [info]: #0 starting fluentd worker pid=1277 ppid=1268 worker=0

Plugin::ConcatFilter::TimeoutError: Timeout flush

I've got Plugin::ConcatFilter::TimeoutError: Timeout flush when have just one message in log.

Steps to replicate

In low loaded environments I have ~1-2 log message per minute/hour.
In this case I have I've got Plugin::ConcatFilter::TimeoutError: Timeout flush.
But It's not an error. I expect that after timeout message will be sent to output, not marked as error.

Concating by time in brackets

Hello my logs looks like Error [2018-10-01 15:12:31,323] ! at or INFO [2018-10-01 15:12:31,323] i want to concat them by time in those brackets cause Ofc when im concatting by error or info i get infos in errors or errors in infos, is this possible?

Bug: Incorrect handling of lines matching both start and end regex when buffer not empty

Conditions:

  • Buffer is not empty
  • Line arrives which matches both multiline_start_regexp and multiline_end_regexp

Expected behavior:

  1. Contents of buffer concatenated & flushed
  2. New line emitted immediately

Observed behavior:

  1. Contents of buffer concatenated & flushed
  2. New line added to new buffer

Reproduction:

  • td-agent.conf:
<system>
    log_level trace
</system>

<source>
    @type tail
    tag test

    path '/var/log/td-agent/test.log'

    read_from_head true

    format none
</source>


<filter test.**>
    @type record_transformer

    <record>
        label 'Pre-concat'
    </record>
</filter>
<filter test.**>
    @type stdout
</filter>


<filter test.**>
    @type concat

    key message

    multiline_start_regexp /Start/
    multiline_end_regexp   /End/

    flush_interval 10
    timeout_label @FLUSH
    use_first_timestamp true
</filter>


<filter test.**>
    @type record_transformer

    <record>
        label 'Concat-out'
    </record>
</filter>
<filter test.**>
    @type stdout
</filter>
<match **>
    @type null
</match>


<label @FLUSH>
    <filter test.**>
        @type record_transformer

        <record>
            label 'Concat-flush'
        </record>
    </filter>
    <filter test.**>
        @type stdout
    </filter>
    <match **>
        @type null
    </match>
</label>
  • Test log generator '/etc/td-agent/test.sh':
#!/bin/env bash

echo "$(date) Start (Will be bumped)"
sleep 5
echo "$(date) Start End (Should return immediately; times out instead)"
  • Execution:
touch '/var/log/td-agent/test.log' && chmod td-agent:td-agent '/var/log/td-agent/test.log'
service td-agent start
sh /etc/td-agent/test.sh >> '/var/log/td-agent/test.log'
  • Output:
2016-08-22 17:30:31 -0400 [info]: reading config file path="/etc/td-agent/td-agent.conf"
2016-08-22 17:30:31 -0400 [info]: starting fluentd-0.12.26
2016-08-22 17:30:31 -0400 [trace]: registered buffer plugin 'file'
2016-08-22 17:30:31 -0400 [trace]: registered buffer plugin 'memory'
2016-08-22 17:30:31 -0400 [trace]: registered filter plugin 'grep'
2016-08-22 17:30:31 -0400 [trace]: registered filter plugin 'record_transformer'
2016-08-22 17:30:31 -0400 [trace]: registered filter plugin 'stdout'
2016-08-22 17:30:31 -0400 [trace]: registered input plugin 'debug_agent'
2016-08-22 17:30:31 -0400 [trace]: registered input plugin 'dummy'
2016-08-22 17:30:31 -0400 [trace]: registered input plugin 'exec'
2016-08-22 17:30:31 -0400 [trace]: registered input plugin 'forward'
2016-08-22 17:30:31 -0400 [trace]: registered input plugin 'gc_stat'
2016-08-22 17:30:31 -0400 [trace]: registered input plugin 'http'
2016-08-22 17:30:31 -0400 [trace]: registered input plugin 'monitor_agent'
2016-08-22 17:30:31 -0400 [trace]: registered input plugin 'object_space'
2016-08-22 17:30:31 -0400 [trace]: registered input plugin 'status'
2016-08-22 17:30:31 -0400 [trace]: registered input plugin 'unix'
2016-08-22 17:30:31 -0400 [trace]: registered input plugin 'syslog'
2016-08-22 17:30:31 -0400 [trace]: registered input plugin 'tail'
2016-08-22 17:30:31 -0400 [trace]: registered input plugin 'tcp'
2016-08-22 17:30:31 -0400 [trace]: registered input plugin 'udp'
2016-08-22 17:30:31 -0400 [trace]: registered output plugin 'copy'
2016-08-22 17:30:31 -0400 [trace]: registered output plugin 'exec'
2016-08-22 17:30:31 -0400 [trace]: registered output plugin 'exec_filter'
2016-08-22 17:30:31 -0400 [trace]: registered output plugin 'file'
2016-08-22 17:30:31 -0400 [trace]: registered output plugin 'forward'
2016-08-22 17:30:31 -0400 [trace]: registered output plugin 'null'
2016-08-22 17:30:31 -0400 [trace]: registered output plugin 'relabel'
2016-08-22 17:30:31 -0400 [trace]: registered output plugin 'roundrobin'
2016-08-22 17:30:31 -0400 [trace]: registered output plugin 'stdout'
2016-08-22 17:30:31 -0400 [trace]: registered output plugin 'tcp'
2016-08-22 17:30:31 -0400 [trace]: registered output plugin 'unix'
2016-08-22 17:30:31 -0400 [info]: gem 'fluent-mixin-config-placeholders' version '0.4.0'
2016-08-22 17:30:31 -0400 [info]: gem 'fluent-mixin-plaintextformatter' version '0.2.6'
2016-08-22 17:30:31 -0400 [info]: gem 'fluent-plugin-concat' version '0.6.0'
2016-08-22 17:30:31 -0400 [info]: gem 'fluent-plugin-concat' version '0.5.0'
2016-08-22 17:30:31 -0400 [info]: gem 'fluent-plugin-forest' version '0.3.1'
2016-08-22 17:30:31 -0400 [info]: gem 'fluent-plugin-mongo' version '0.7.13'
2016-08-22 17:30:31 -0400 [info]: gem 'fluent-plugin-multi-format-parser' version '0.0.2'
2016-08-22 17:30:31 -0400 [info]: gem 'fluent-plugin-rewrite-tag-filter' version '1.5.5'
2016-08-22 17:30:31 -0400 [info]: gem 'fluent-plugin-s3' version '0.6.8'
2016-08-22 17:30:31 -0400 [info]: gem 'fluent-plugin-sar' version '0.0.4'
2016-08-22 17:30:31 -0400 [info]: gem 'fluent-plugin-scribe' version '0.10.14'
2016-08-22 17:30:31 -0400 [info]: gem 'fluent-plugin-secure-forward' version '0.4.3'
2016-08-22 17:30:31 -0400 [info]: gem 'fluent-plugin-systemd' version '0.0.3'
2016-08-22 17:30:31 -0400 [info]: gem 'fluent-plugin-tail-multiline' version '0.1.5'
2016-08-22 17:30:31 -0400 [info]: gem 'fluent-plugin-td' version '0.10.28'
2016-08-22 17:30:31 -0400 [info]: gem 'fluent-plugin-td-monitoring' version '0.2.2'
2016-08-22 17:30:31 -0400 [info]: gem 'fluent-plugin-webhdfs' version '0.4.2'
2016-08-22 17:30:31 -0400 [info]: gem 'fluentd' version '0.12.26'
2016-08-22 17:30:31 -0400 [info]: gem 'fluentd' version '0.10.61'
2016-08-22 17:30:31 -0400 [info]: adding filter in @FLUSH pattern="test.**" type="record_transformer"
2016-08-22 17:30:31 -0400 [info]: adding filter in @FLUSH pattern="test.**" type="stdout"
2016-08-22 17:30:31 -0400 [info]: adding match in @FLUSH pattern="**" type="null"
2016-08-22 17:30:31 -0400 [info]: adding filter pattern="test.**" type="record_transformer"
2016-08-22 17:30:31 -0400 [info]: adding filter pattern="test.**" type="stdout"
2016-08-22 17:30:31 -0400 [info]: adding filter pattern="test.**" type="concat"
2016-08-22 17:30:31 -0400 [trace]: registered filter plugin 'concat'
2016-08-22 17:30:31 -0400 [info]: adding filter pattern="test.**" type="record_transformer"
2016-08-22 17:30:31 -0400 [info]: adding filter pattern="test.**" type="stdout"
2016-08-22 17:30:31 -0400 [info]: adding match pattern="**" type="null"
2016-08-22 17:30:31 -0400 [info]: adding source type="tail"
2016-08-22 17:30:31 -0400 [warn]: 'pos_file PATH' parameter is not set to a 'tail' source.
2016-08-22 17:30:31 -0400 [warn]: this parameter is highly recommended to save the position to resume tailing.
2016-08-22 17:30:31 -0400 [info]: using configuration file: <ROOT>
  <system>
    log_level trace
  </system>
  <source>
    @type tail
    tag test
    path /var/log/td-agent/test.log
    read_from_head true
    format none
  </source>
  <filter test.**>
    @type record_transformer
    <record>
      label Pre-concat
    </record>
  </filter>
  <filter test.**>
    @type stdout
  </filter>
  <filter test.**>
    @type concat
    key message
    multiline_start_regexp /Start/
    multiline_end_regexp /End/
    flush_interval 10
    timeout_label @FLUSH
    use_first_timestamp true
  </filter>
  <filter test.**>
    @type record_transformer
    <record>
      label Concat-out
    </record>
  </filter>
  <filter test.**>
    @type stdout
  </filter>
  <match **>
    @type null
  </match>
  <label @FLUSH>
    <filter test.**>
      @type record_transformer
      <record>
        label Concat-flush
      </record>
    </filter>
    <filter test.**>
      @type stdout
    </filter>
    <match **>
      @type null
    </match>
  </label>
</ROOT>
2016-08-22 17:30:31 -0400 [info]: following tail of /var/log/td-agent/test.log
2016-08-22 17:30:39 -0400 test: {"message":"Mon Aug 22 17:30:39 EDT 2016 Start (Will be bumped)","label":"Pre-concat"}
2016-08-22 17:30:44 -0400 test: {"message":"Mon Aug 22 17:30:44 EDT 2016 Start End (Should return immediately; times out instead)","label":"Pre-concat"}
2016-08-22 17:30:44 -0400 test: {"message":"Mon Aug 22 17:30:39 EDT 2016 Start (Will be bumped)","label":"Concat-out"}
2016-08-22 17:30:54 -0400 test: {"message":"Mon Aug 22 17:30:44 EDT 2016 Start End (Should return immediately; times out instead)","label":"Concat-flush"}
2016-08-22 17:30:54 -0400 [info]: Timeout flush: test:default

Suggested fix:

The line matching logic should look something like this:

if buffer is empty:
    if line is startline:
        buffer.append(line)
    else:
        return line
else:
    if line is startline:
        buffer.flush()

    buffer.append(line)

    if line is endline:
        buffer.flush()

Add max_lines parameter to limit total message size

Suggestion

Add max_lines parameter used along with partial_key to limit the number of lines concatenated into a single message. Possible max_behavior parameter to indicate what to do when max is hit (truncate, new, drop).

Rationale

Sometimes messages that are broken up by Docker and reassembled by Fluentd via concat are too big for a later part of the pipeline. The reason why Docker breaks up messages is to avoid OOME. If we reconstruct the full message, it is possible for this to trigger an OOME in Fluentd, especially if there are several or many. Using max_lines to limit the total number of lines concatenated effectively acts as a truncation device on logs that exceed a certain length. In my case, messages from Docker are broken into 16k, but I emit to Kafka which we limit to 1M. With additional metadata/headers added, we need to limit the maximum number of lines to ~57 to avoid overrunning the limit on Kafka.

Example config

<filter>
      @type concat
      stream_identity_key docker_id
      key message
      partial_key partial_message
      partial_value true
      separator ""
      flush_interval 5
      max_lines 57
      max_behavior truncate
 </filter>

Timeout flush [dump an error event]

I am testing a simple setup to concat multiple log events being the output of a docker container running a Java program.

The setup is very simple:
Java App (Docker) ----> FluentD (Docker) [output to console]

When I intentionally throw an error inside the Java App the logs are send to FluentD but on certain moments I am getting an error which I can not explain.

Error message:
2016-05-26 13:35:59 +0000 [warn]: dump an error event: error_class=Fluent::ConcatFilter::TimeoutError error="Timeout flush: ...... etc

The result of this error is that the original log message cannot be parsed.

<source>
  @type forward
  port 24224
  bind 0.0.0.0
</source>

<filter *.docker.*>
  @type concat
  key log
  stream_identity_key container_id
  multiline_start_regexp /^\S+/
  flush_interval 5s
</filter>

<match **>
    @type stdout
</match>

Any suggestion how this can be solved?

multiline_start_regexp doesn't match if the line contains escaped character

Hello,

Problem

I'm not so familiar with the regexp of Ruby so please forgive me if this is a basic behaviour but it seems that the multiline_start_regexp pattern doesn't match if the defined field contains an escaped character. In my case an escaped double quote. Without the escaped characters it is working fine. I checked my pattern on rubular.com (regexp tester) and according to the page, the pattern partly matches to the given text.
Could you please help me?

Steps to replicate

Test line:

{"docker": {"container_id": "1"}, "log":"time=\"2018-07-19T10:29:24.645Z\" level=\"INFO\" msg=\"test msg\"","stream":"stdout","time":"2018-07-19T10:29:24.645950499Z"}

Config:

    <source>
      @id containers.log
      @type tail
      path /var/log/concat/*.log
      pos_file /var/log/concat/es-containers.log.pos
      time_format %Y-%m-%dT%H:%M:%S.%NZ
      tag raw.kubernetes.*
      format json
      read_from_head true
    </source>
    <filter **>
      @type concat
      key log
      separator ""
      stream_identity_key docker.container_id
      multiline_start_regexp /^time=/
      flush_interval 5
      timeout_label @NORMAL
    </filter>

Expected Behavior

The given pattern should match on the example line so the plugin should start to concatenate it with the next entries.

Your environment

fluentd 1.2.2
fluent-plugin-concat 2.3.0

Can the value of "stream_identity_key" use a subkey of the first level?

log example:

2019-04-03 03:30:56.845755100 +0000 kubernetes.var.log.containers.fluentd-8f6fh_kube-system_fluentd-d13f076d4dbc092a4f78eae6a8deae7abbd48c9a9965ed4cdafdb2aa767f450b.log:
{
  "log": "2019-04-03 03:30:55.test",
  "stream": "stdout",
  "docker": {
    "container_id": "d13f076d4dbc092a4f78eae6a8deae7abbd48c9a9965ed4cdafdb2aa767f450b"
  },
  "kubernetes": {
    "container_name": "fluentd",
    "namespace_name": "kube-system",
    "pod_name": "fluentd-8f6fh",
    "container_image": "docker.io/w564791/fluentd:elasticsearch-v8",
    "container_image_id": "docker-pullable://docker.io/w564791/fluentd@sha256:c620a054d033033404fbc8ebb05269d6235421d371198010b48eb4570fe5072b",
    "pod_id": "db17e7d6-55c0-11e9-a976-00163e2e35e9",
    "labels": {
      "controller-revision-hash": "2669475987",
      "k8s-app": "fluentd-logging",
      "pod-template-generation": "11",
      "version": "v1",
      "kubernetes_io/cluster-service": "true"
    },
    "host": "192.168.3.132",
    "master_url": "https://10.254.0.1:443/api",
    "namespace_id": "c29b8d70-3174-11e8-9e76-00163e2e35e9"
  }
}

Can the value of "stream_identity_key" use a subkey of the first level? likes docker.container_id?

Feature request: new configuration: 'timeout_is_error'

I request that a new configuration option be added, 'timeout_is_error' (type boolean, default true). This option would be such that, if it is set to true, if the flush_interval expires on a buffer, an error event is generated and the content of the buffer is output under the timeout_label label (as is the current behavior), but if it is false, the content of the buffer is emitted as if it had been completed and no error is generated.

I have a number of logs which I process using concat. In each case, I can tell when a new record is starting (because it starts with a timestamp), but have no reliable way of telling when one ends, except by waiting for the next log entry to start. As a result, I get lots of bogus timeout errors from concat. Dealing with this adds a lot of pointless, hard-to-maintain complexity and repetition to my Fluendtd configuration files. It would be much easier if I could just tell concat that no, those timeouts are not errors and it should not treat them as if they were.

Add config to disable firstline matching lastline regex

I'm trying to concatenate java logs and the only way to know to stop matching is to get a 'normal' line again, which also happens to match exactly a first line.

My best scenario would be to start accumulating lines when matching the multiline_start_regexp and end excluding the multiline_end_regexp.

Idea?

My current config:

<filter docker.**>
  @type concat
  key log
  multiline_start_regexp /^\d{4}-\d{1,2}-\d{1,2} \d{2}:\d{2}:\d{2}.\d{1,3}/
  multiline_end_regexp /^\d{4}-\d{1,2}-\d{1,2} \d{2}:\d{2}:\d{2}.\d{1,3}/
  stream_identity_key container_id
  use_first_timestamp true
  flush_interval 1s
</filter>

example log (simplified, actual log is from docker fluentd logger):

2016-09-07 13:17:40.848  WARN 23 --- [nfoReplicator-0] c.n.discovery.InstanceInfoReplicator     : There was a problem with the instance info replicator

com.netflix.discovery.shared.transport.TransportException: Cannot execute request on any known server
     at com.netflix.discovery.shared.execute(RetryableEurekaHttpClient.java:111) ~[eureka-client-1.4.10.jar:1.4.10]
     at com.netflix.discovery.shared.register(EurekaHttpClientDecorator.java:56) ~[eureka-client-1.4.10.jar:1.4.10]
     at java.lang.Thread.run(Thread.java:745) [na:1.8.0_92-internal]

2016-09-07 13:17:40.726  INFO 23 --- [nfoReplicator-0] com.netflix.discovery.DiscoveryClient    : DiscoveryClient_SERVICE-AUTH/c500a64ed52c:service-auth:9000: registering service...
2016-09-07 13:17:40.801  INFO 23 --- [  restartedMain] o.s.c.support.DefaultLifecycleProcessor  : Starting beans in phase 2147483647
2016-09-07 13:17:40.827 ERROR 23 --- [nfoReplicator-0] c.n.d.s.t.d.RedirectingEurekaHttpClient  : Request execution error

com.sun.jersey.api.client.ClientHandlerException: java.net.UnknownHostException: service-registry
     at com.sun.jersey.client.handle(ApacheHttpClient4Handler.java:187) ~[jersey-apache-client4-1.19.1.jar:1.19.1]
     at com.sun.jersey.api.client.handle(GZIPContentEncodingFilter.java:123) ~[jersey-client-1.19.1.jar:1.19.1]
     at com.netflix.discovery.EurekaIdentityHeaderFilter.handle(EurekaIdentityHeaderFilter.java:27) ~[eureka-client-1.4.10.jar:1.4.10]

Multi-line message should keep timestamp of first line

If multiple lines are concatenated, the resulting event should have the timestamp of the first event in that list, not the last. Similarly, if an event is emitted due to timeout, it should carry the timestamp of the first element, not the time at which the timeout occurred.

Emit last received log event as normal event, after flush_interval has passed?

Currently the last log lines which are in the buffer are emitted as error event after the flush_interval has passed. This results in the fact that i need to place log handling also in the @error label to handle the last message.

If the flush_interval has passed can the last log event be processed as a normal log line?

An example scenario:

  • an application boots up and is just waiting to do something, then the last log line will be emitted as error event. But this is not a real error. It is just doing nothing.

n_lines

Hello
is it possible to set n_lines at least n times? because for some logs for example mysql-slow log there are different number of lines in each log.
Thanks a lot

Concat plugin not concatenating multiple lines

Problem

Concat not working for multi line on kubernetes environment. I am using fluentd kinesis and concat plugin

I have two scenarios for which its not working

  1. Multiline seperated by \n and starting with a time stamp - Only the first line of multiline reaches the destination. Other logs are lost.
    Example: Testing line 1 \n Testing line 2\n Testing line3
    The log is lost and does not reach the destination
  2. Java exception stack trace which is multiple lines - The stack traces are reaching the destination as multiple records and in jumbled. They are not sent as single record after concatenation.

This is my fluentd configuration

# concat multiline logs if the first line starts with a date in a specific format
    <filter kubernetes.**>
      @type concat
      key message
      multiline_start_regexp /^\d{4}-\d{1,2}-\d{1,2}/
      stream_identity_key stream
      flush_interval 2
      use_first_timestamp true
      timeout_label @NORMAL
    </filter>

I am using concat plugin version 2.3.0

Also, I am not clear with the usage of stream_identity_key. What is it used for?

Concatenating logs results in newline being added

When concatenating two logs into one, a newline is appended at the separation between the two logs.

If this log is made of JSON, with bad luck we can have an escaping character () just before this newline, making the json parser unable to read it.

This is easily dealt with a record_transformer, but would this be a good feature to do this built-in?

Can not install last release

I'm using this plugin to build led . I've noticed that you've released v1.0.0 and v2.0.0 recently. I've got the following error

Building native extensions.  This could take a while...
ERROR:  Error installing fluent-plugin-concat:
    ERROR: Failed to build gem native extension.

    current directory: /root/.gem/ruby/2.3.0/gems/msgpack-1.0.0/ext/msgpack
/usr/bin/ruby -r ./siteconf20161007-13-147n85y.rb extconf.rb
mkmf.rb can't find header files for ruby at /usr/lib/ruby/include/ruby.h

extconf failed, exit code 1

Gem files will remain installed in /root/.gem/ruby/2.3.0/gems/msgpack-1.0.0 for inspection.
Results logged to /root/.gem/ruby/2.3.0/extensions/x86_64-linux/2.3.0/msgpack-1.0.0/gem_make.out

I've resolved the error by sticking with version 0.6.2 like this

gem install fluent-plugin-concat -v 0.6.2

Any ideas ?
Thank.

I need a final recipe from [warn]: dump an error event: error_class=Fluent::Plugin::ConcatFilter::TimeoutError error="Timeout flush: kubernetes.var.log

#### Problem
I am setting up an optional fluentd filter that uses the concat plugin. After adding a new filter, I got a lot of errors. I see that the concat cannot process many messages and I began to lose logs

2019-11-25 17:25:58 +0000 [warn]: dump an error event: error_class=Fluent::Plugin::ConcatFilter::TimeoutError error="Timeout flush: kubernetes.var.log.containers.core-deployment-prod-8459fd75c7-x4vq2_core-prod_core-prod-279427d134fe033554565456345354564895667830d6.log:" location=nil tag="kubernetes.var.log.containers.core-deployment-prod-8459fd75c7-x4vq2_core-prod_core-prod-279427d134fe033554565456345354564895667830d6.log" time=2019-11-25 17:25:58.009520360 +0000 record={"log"=>"2019-11-25 17:25:47 [WRN] QuestionSalePointService: BatchCreateOrUpdateAsync: finish Memory usage:379.089324951172 <s:>\n", "stream"=>"stdout"}

I found workarounds on the net, but they do not help me:
#37
fluent/fluentd#2587
#4
https://stackoverflow.com/questions/37159521/flush-timeouterror-in-fluentd

Steps to replicate

The part of the config that is responsible for this filter is:

<filter kubernetes.var.log.containers.core-deployment-**>
  @type concat
  key log
  stream_identity_key tag
  multiline_start_regexp /^(?<time>\d{4}-\d{1,2}-\d{1,2} \d{1,2}:\d{1,2}:\d{1,2}) \[(?<level>[^\]\\]+)\] (?<message>.*)/
  flush_interval 10s
</filter>

Expected Behavior

After adding an additional filter to the original fluentd config https://github.com/kubernetes/kubernetes/blob/master/cluster/addons/fluentd-elasticsearch/fluentd-es-configmap.yaml, I start to lose logs with an error

Your environment

K8S 1.13.5

  • OS version
    I use quay.io/fluentd_elasticsearch/fluentd:v2.7.0 docker image
    It is EFK solution for K8S

  • paste result of fluentd --version or td-agent --version
    fluentd 1.6.3

  • plugin version

    • paste boot log of fluentd or td-agent
    • paste result of fluent-gem list, td-agent-gem list or your Gemfile.lock
      *** LOCAL GEMS ***

activesupport (5.2.3)
addressable (2.6.0)
bigdecimal (1.2.8)
concurrent-ruby (1.1.5)
cool.io (1.5.4)
did_you_mean (1.0.0)
dig_rb (1.0.1)
domain_name (0.5.20190701)
elasticsearch (7.3.0)
elasticsearch-api (7.3.0)
elasticsearch-transport (7.3.0)
excon (0.65.0)
faraday (0.15.4)
ffi (1.11.1)
fluent-plugin-concat (2.4.0)
fluent-plugin-detect-exceptions (0.0.12)
fluent-plugin-elasticsearch (3.5.4)
fluent-plugin-kubernetes_metadata_filter (2.2.0)
fluent-plugin-multi-format-parser (1.0.0)
fluent-plugin-prometheus (1.4.0)
fluent-plugin-systemd (1.0.2)
fluentd (1.6.3)
http (0.9.8)
http-cookie (1.0.3)
http-form_data (1.0.3)
http_parser.rb (0.6.0)
i18n (1.6.0)
io-console (0.4.5)
json (1.8.3)
kubeclient (1.1.4)
lru_redux (1.1.0)
mime-types (3.2.2)
mime-types-data (3.2019.0331)
minitest (5.11.3, 5.9.0)
msgpack (1.3.0)
multi_json (1.13.1)
multipart-post (2.1.1)
net-telnet (0.1.1)
netrc (0.11.0)
oj (3.8.1)
power_assert (0.2.7)
prometheus-client (0.9.0)
psych (2.1.0)
public_suffix (3.1.1)
quantile (0.2.1)
rake (10.5.0)
rdoc (4.2.1)
recursive-open-struct (1.0.0)
rest-client (2.0.2)
serverengine (2.1.1)
sigdump (0.2.4)
strptime (0.2.3)
systemd-journal (1.3.3)
test-unit (3.1.7)
thread_safe (0.3.6)
tzinfo (1.2.5)
tzinfo-data (1.2019.2)
unf (0.1.4)
unf_ext (0.0.7.6)
yajl-ruby (1.4.1)

timeout flush and can not match java stack multi

Problem

This is my java exception log
image
image

And this is my fluentd.conf
image

And then I open the kibana to search my log,but the log is not my expect and it reports timeout flush
image
image

Your environment

fluent-plugin-concat 2.3.0
fluentd 1.3.0
docker 18.09.0
docker-compose 1.23.1

please help me.

Simple multiline problem

Hello.

I have simple scenario.

I have multiline input
I want to save it as one record.

Unfortunately I can not do it

Environment:

2019-06-17 13:10:06 +0000 [info]: starting fluentd-1.5.2 pid=7 ruby="2.5.5"
2019-06-17 13:10:06 +0000 [info]: spawn command to main:  cmdline=["/usr/bin/ruby", "-Eascii-8bit:ascii-8bit", "/usr/bin/fluentd", "-c", "/fluentd/etc/fluent.conf", "-p", "/fluentd/plugins", "--under-supervisor"]
2019-06-17 13:10:07 +0000 [info]: gem 'fluent-plugin-concat' version '2.3.0'
2019-06-17 13:10:07 +0000 [info]: gem 'fluent-plugin-elasticsearch' version '3.5.2'
2019-06-17 13:10:07 +0000 [info]: gem 'fluent-plugin-grok-parser' version '2.5.1'
2019-06-17 13:10:07 +0000 [info]: gem 'fluent-plugin-kafka' version '0.9.6'
2019-06-17 13:10:07 +0000 [info]: gem 'fluent-plugin-kubernetes_metadata_filter' version '2.2.0'
2019-06-17 13:10:07 +0000 [info]: gem 'fluentd' version '1.5.2'
2019-06-17 13:10:07 +0000 [info]: adding match pattern="xxx" type="stdout"
2019-06-17 13:10:07 +0000 [info]: adding filter pattern="es" type="concat"
2019-06-17 13:10:07 +0000 [info]: adding match pattern="es" type="copy"
2019-06-17 13:10:07 +0000 [warn]: #0 [out_es] Detected ES 7.x or above: `_doc` will be used as the document `_type`.
2019-06-17 13:10:07 +0000 [info]: adding source type="forward"
2019-06-17 13:10:07 +0000 [info]: #0 starting fluentd worker pid=15 ppid=7 worker=0
2019-06-17 13:10:07 +0000 [info]: #0 listening port port=24224 bind="0.0.0.0"
2019-06-17 13:10:07 +0000 [info]: #0 fluentd worker is now running worker=0


This is my config I thought should work.
Config:


  <source>
    @type forward
    port 24224
    bind 0.0.0.0
	
	tag es
  </source>
  
  
  <match xxx>
    @type stdout
  </match>
  
  <filter es>
  @type concat
  key log
  stream_identity_key container_id
  flush_interval 3s
  #n_lines 3
  #multiline_start_regexp /^\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2},\d{3}
  #multiline_end_regexp /^\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2},\d{3}
  multiline_start_regexp /^2019/
  #multiline_end_regexp /^2019
</filter>

Test case:

docker run --log-driver=fluentd --log-opt tag="docker.{.ID}}" ubuntu echo '2019-06-17 12:41:44,827 - Hello Fluentd!
XXXXXXXXXXXXXX
YYYYYYYYYYYY'

Current fluentd output.

2019-06-17 13:10:12 +0000 [warn]: #0 dump an error event: error_class=Fluent::Plugin::ConcatFilter::TimeoutError error="Timeout flush: es:281f3b726ee01d1df25e67084c329969523e03533a1490eeb4b9654ba7b36f0d" location=nil tag="es" time=2019-06-17 13:10:12.377438826 +0000 record={"log"=>"2019-06-17 12:41:44,827 - Hello Fluentd!\nXXXXXXXXXXXXXX\nYYYYYYYYYYYY", "container_id"=>"281f3b726ee01d1df25e67084c329969523e03533a1490eeb4b9654ba7b36f0d", "container_name"=>"/blissful_yonath", "source"=>"stdout"}
2019-06-17 13:10:12 +0000 [info]: #0 Timeout flush: es:281f3b726ee01d1df25e67084c329969523e03533a1490eeb4b9654ba7b36f0d

I thought it will merge this 3 lines without any errors but it doesn't.
Unfortunately it raise Timeout.

Do you know how I can fix it to combine these three lines into one fluentd record ?

Not able to combine java stack trace with the previous line as single event in elastic search

Problem

I am using fluentd to send spring boot logs to elastic search, I am not able to combine java stack trace with the previous line as single event in elastic search,
I tried using concat plugin as well as detect_exception plugin
...

Steps to replicate

input:

2019-09-27 11:34:06.208 +0000 [http-nio-exec-9 ] [ERROR] c.l.u.e.LcValidationErrorHandler clientIP=xxxx- Unknown Exception
org.springframework.dao.DataIntegrityViolationException: could not execute statement; SQL [n/a]; constraint [null]; nested exception is org.hibernate.exception.ConstraintViolationException: could not execute statement
        at org.springframework.orm.jpa.vendor.HibernateJpaDialect.convertHibernateAccessException(HibernateJpaDialect.java:278)
        at org.springframework.orm.jpa.vendor.HibernateJpaDialect.translateExceptionIfPossible(HibernateJpaDialect.java:2

fluentd conf:

<source>
@type tail
path /error.log
pos_file /error.pos
tag elastic.access
<parse>
@type grok
        <grok>
                pattern %{TIMESTAMP_ISO8601:timestamp} %{DATA:thread}\[%{LOGLEVEL:level}\] %{NOTSPACE:provider} sourceId=%{GREEDYDATA:sourceId} corrId=%{GREEDYDATA:corrId} clientIP=%{IPORHOST:clientIP} %{GREEDYDATA:msg}
                time_format %{TIMESTAMP_ISO8601}
        </grok>
        <grok>
                pattern %{GREEDYDATA:msg}
        </grok>
</parse>
@label @CONCAT
</source>
<label @CONCAT>
 <filter **>
  @type concat
  key msg
  multiline_start_regexp /Unknown Exception/
  continuous_line_regexp /^(\s+at|Caused by:|\s+...|org.|com.)/
  flush_interval 3s
  timeout_label @NORMAL
 </filter>
 <match **>
        @type aws-elasticsearch-service
        type_name "access_log"
        logstash_format true
        include_tag_key true
        tag_key "@log_name"
        flush_interval 1s
        <buffer>
                flush_thread_count 4
        </buffer>

        <endpoint>
                url https://xxx.es.amazonaws.com
                region us-west-2
        </endpoint>
 </match>
</label>

output

msg:- Unknown Exception org.springframework.dao.DataIntegrityViolationException: could not execute statement; SQL [n/a]; constraint [null]; nested exception is org.hibernate.exception.ConstraintViolationException: could not execute statement at org.springframework.orm.jpa.vendor.HibernateJpaDialect.convertHibernateAccessException(HibernateJpaDialect.java:278)

Expected Behavior

msg:- Unknown Exception timestamp:8237-09-27 11:34:06.208 thread:+0000 [http-nio-exec-9 ] level:ERROR provider:c.l.u.e.LcValidationErrorHandler  clientIP:xxx @timestamp:September 30th 2019, 21:39:48.023 @log_name:access _id:WyOehW0B3OvYvndt8-Ek _type:access_log _index:logstash-2019.10.01
org.springframework.dao.DataIntegrityViolationException: could not execute statement; SQL [n/a]; constraint [null]; nested exception is org.hibernate.exception.ConstraintViolationException: could not execute statement at org.springframework.orm.jpa.vendor.HibernateJpaDialect.convertHibernateAccessException(HibernateJpaDialect.java:278)

Your environment

  • OS version
    CentOS Linux 7 (Core)
  • paste result of fluentd --version or td-agent --version
    fluentd 1.7.1
  • plugin version
    fluent-plugin-concat' version '2.4.0'
    • paste boot log of fluentd or td-agent
    • paste result of fluent-gem list, td-agent-gem list or your Gemfile.lock

Log lines not being joined

Problem

My log line output from Crystal's logger are not being joined

Steps to replicate

Example config:

<source>
  @type tail
  format none
  path /home/baylinz/rolltrax_web/logs/*.log
  pos_file /var/lib/google-fluentd/pos/rolltrax_web.pos
  read_from_head true
  tag raw.crystal
</source>

<filter raw.crystal>
  @type concat
  key log
  multiline_start_regexp /^.*ERROR/
  multiline_end_regexp /^.*from \?\?\?/
</filter>

Example log:

E, [2019-08-14 18:20:36 +00:00 #8285] ERROR -- : duplicate key value violates unique constraint "users_email_key"
  from lib/clear/src/clear/sql/insert_query.cr:62:5 in 'execute'
  from lib/clear/src/clear/model/modules/has_saving.cr:112:20 in 'save'
  from lib/clear/src/clear/model/modules/has_saving.cr:137:55 in 'save!'
  from src/routes/api/generic/users.cr:61:7 in '->'
  from lib/kemal/src/kemal/route.cr:255:3 in '->'
  from lib/kemal/src/kemal/route_handler.cr:255:3 in 'process_request'
  from lib/kemal/src/kemal/route_handler.cr:17:7 in 'call'
  from /usr/share/crystal/src/http/server/handler.cr:24:7 in 'call_next'
  from lib/kemal/src/kemal/websocket_handler.cr:13:14 in 'call'
  from /usr/share/crystal/src/http/server/handler.cr:24:7 in 'call_next'
  from lib/kemal/src/kemal/filter_handler.cr:21:7 in 'call'
  from /usr/share/crystal/src/http/server/handler.cr:24:7 in 'call_next'
  from lib/kemal/src/kemal/static_file_handler.cr:14:11 in 'call'
  from /usr/share/crystal/src/http/server/handler.cr:24:7 in 'call_next'
  from lib/kemal/src/kemal/exception_handler.cr:8:7 in 'call'
  from /usr/share/crystal/src/http/server/handler.cr:24:7 in 'call_next'
  from lib/kemal/src/kemal/log_handler.cr:10:35 in 'call'
  from /usr/share/crystal/src/http/server/handler.cr:24:7 in 'call_next'
  from lib/kemal/src/kemal/init_handler.cr:12:7 in 'call'
  from /usr/share/crystal/src/http/server/request_processor.cr:39:11 in 'process'
  from /usr/share/crystal/src/http/server/request_processor.cr:16:3 in 'process'
  from /usr/share/crystal/src/http/server.cr:414:5 in 'handle_client'
  from /usr/share/crystal/src/http/server.cr:380:13 in '->'
  from /usr/share/crystal/src/fiber.cr:255:3 in 'run'
  from /usr/share/crystal/src/fiber.cr:75:34 in '->'
  from ???

Output

Separate log lines, i.e.
image

Plugin is concatinating multiple lines into single event

Problem

...Pluggin is concatinating multiple lines into single event

Steps to replicate

Config:

<source>
  @id fluentd-containers-flink.log
  @type tail
  path /var/log/containers/**flink**.log
  pos_file /var/log/fluentd-containers-flink.log.pos
  time_format %Y-%m-%dT%H:%M:%S.%NZ
  tag raw.kubernetes.*
  format json
  @label @CONCAT_FLINK
</source>

<label @CONCAT_FLINK>
  <filter **flink**>
    @type concat
    key log
    stream_identity_key container_id
    multiline_start_regexp /\d{4}-\d{1,2}-\d{1,2}/
    continuous_line_regexp /Caused\sby:|\s+at\s.*|^java|\s+ \.\.\. (\d)+ more/
    #multiline_end_regexp /\s+.*more$/
    flush_interval 10
    timeout_label @OUTPUT
  </filter>
  <filter **flink**>
    @type parser
    key_name log
    reserve_data true
    <parse>
      @type regexp
      expression /^(?<log_date>\d{4}-\d{2}-\d{2}\s\d{2}:\d{2}:\d{2},\d{3})\s(?<log_level>\w+)\s+(?<log_logger>.*?\s)\s+-(?<log_message>.*)/m
    </parse>
  </filter>

  <match>
   @type relabel
   @label @OUTPUT
  </match>

Log line:

{"log":"2019-02-08 10:03:50,885 WARN  akka.remote.transport.netty.NettyTransport                    - Remote connection to [/172.21.0.106:35000] failed with org.apache.flink.shaded.akka.org.jboss.netty.handler.codec.frame.TooLongFrameException: Adjusted frame length exceeds 40485760: 1195725860 - discarded\n","stream":"stdout","time":"2019-02-08T10:03:50.885661556Z"}
{"log":"2019-02-08 10:03:51,670 ERROR org.apache.flink.runtime.blob.BlobServerConnection            - Error while executing BLOB connection.\n","stream":"stdout","time":"2019-02-08T10:03:51.671149184Z"}
{"log":"java.io.IOException: Unknown operation 71\n","stream":"stdout","time":"2019-02-08T10:03:51.671201184Z"}
{"log":"\u0009at org.apache.flink.runtime.blob.BlobServerConnection.run(BlobServerConnection.java:120)\n","stream":"stdout","time":"2019-02-08T10:03:51.671218384Z"}
{"log":"2019-02-08 10:04:05,884 WARN  akka.remote.transport.netty.NettyTransport                    - Remote connection to [/172.21.0.106:35602] failed with org.apache.flink.shaded.akka.org.jboss.netty.handler.codec.frame.TooLongFrameException: Adjusted frame length exceeds 40485760: 1195725860 - discarded\n","stream":"stdout","time":"2019-02-08T10:04:05.884977614Z"}
{"log":"2019-02-08 10:04:06,670 ERROR org.apache.flink.runtime.blob.BlobServerConnection            - Error while executing BLOB connection.\n","stream":"stdout","time":"2019-02-08T10:04:06.671248435Z"}
{"log":"java.io.IOException: Unknown operation 71\n","stream":"stdout","time":"2019-02-08T10:04:06.671281135Z"}
{"log":"\u0009at org.apache.flink.runtime.blob.BlobServerConnection.run(BlobServerConnection.java:120)\n","stream":"stdout","time":"2019-02-08T10:04:06.671293435Z"}
{"log":"2019-02-08 10:04:20,885 WARN  akka.remote.transport.netty.NettyTransport                    - Remote connection to [/172.21.0.106:36216] failed with org.apache.flink.shaded.akka.org.jboss.netty.handler.codec.frame.TooLongFrameException: Adjusted frame length exceeds 40485760: 1195725860 - discarded\n","stream":"stdout","time":"2019-02-08T10:04:20.886034327Z"}
{"log":"2019-02-08 10:04:21,671 ERROR org.apache.flink.runtime.blob.BlobServerConnection            - Error while executing BLOB connection.\n","stream":"stdout","time":"2019-02-08T10:04:21.672123047Z"}
{"log":"java.io.IOException: Unknown operation 71\n","stream":"stdout","time":"2019-02-08T10:04:21.672146847Z"}
{"log":"\u0009at org.apache.flink.runtime.blob.BlobServerConnection.run(BlobServerConnection.java:120)\n","stream":"stdout","time":"2019-02-08T10:04:21.672151847Z"}

Current O/P:

Currently the log keyword has both the entries:

2019-02-08 09:58:06,179 WARN  akka.remote.transport.netty.NettyTransport                    - Remote connection to [/172.21.0.106:35690] failed with org.apache.flink.shaded.akka.org.jboss.netty.handler.codec.frame.TooLongFrameException: Adjusted frame length exceeds 40485760: 1195725860 - discarded
2019-02-08 09:58:10,515 ERROR org.apache.flink.runtime.blob.BlobServerConnection            - Error while executing BLOB connection.

java.io.IOException: Unknown operation 71

	at org.apache.flink.runtime.blob.BlobServerConnection.run(BlobServerConnection.java:120)

Expected Behavior

Expect both line to be separated

Your environment

  • OS version - Kubernetes
  • paste result of fluentd --version or td-agent --version - fluentd 1.2.4
  • plugin version
    • paste boot log of fluentd or td-agent
    • paste result of fluent-gem list, td-agent-gem list or your Gemfile.lock

fluent-plugin-concat (2.3.0)
fluent-plugin-detect-exceptions (0.0.11)
fluent-plugin-elasticsearch (2.11.5)
fluent-plugin-grok-parser (2.4.0)
fluent-plugin-kubernetes_metadata_filter (2.0.0)
fluent-plugin-multi-format-parser (1.0.0)
fluent-plugin-prometheus (1.0.1)
fluent-plugin-systemd (1.0.1)

deadlock error

Problem

If I use multiline detection, it works but I get deadlock; recursive locking errors in fluentd logs

(concat plugin is used as part of Splunk connect for Kubernetes https://github.com/splunk/splunk-connect-for-kubernetes)

Steps to replicate

Log:

2019-03-18 06:06:48.859  INFO [manage-xxx-service,,,] [10.2.7.19] 1 --- [-15276-thread-1] o.a.k.clients.consumer.ConsumerConfig    : ConsumerConfig values:
        auto.commit.interval.ms = 5000
        auto.offset.reset = latest
        bootstrap.servers = [my-kafka-service:9092]
        check.crcs = true

Config:

      <filter tail.containers.var.log.containers.*.log>
        @type concat
        key log
        timeout_label @SPLUNK
        stream_identity_key stream
        multiline_start_regexp /^\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2}.\d{3}/
        flush_interval 5s
      </filter>

Expected Behavior

It works (multi line appears as a single event in Splunk) but I see many errors in fluentd logs. I believe 1 error every 5 second for every log file:

2019-03-18 05:13:54 +0000 [warn]: #0 dump an error event: error_class=ThreadError error="deadlock; recursive locking" location="/usr/local/bundle/gems/fluent-plugin-concat-2.3.0/lib/fluent/plugin/filter_concat.rb:144:in `synchronize'" tag="tail.containers.var.log.containers.manage-xxx-service-85855985fc-pgl6g_yyy_manage-incident-service-0ee1814dcd3596c96e0bf6c0a2e65a9437cf1b282a95daf41fbd6e8933df1f8f.log" time=

Your environment

  • Kubernetes
  • Splunk connect for kubernetes

See original ticket splunk/splunk-connect-for-kubernetes#111

Loosing log info due to timeout flush

I have a situation where log information is getting lost due to timeout errors. I have set the flush interval to 30 seconds but i am stille getting timeout flushes...

I am sending the following log information to Fluentd:

2016-06-02 12:32:53.077  INFO 1 --- [           main] content line 1
2016-06-02 12:32:53.097  INFO 1 --- [           main] content line 2
2016-06-02 12:32:53.371  INFO 1 --- [           main] content line 3
2016-06-02 12:32:54.368  INFO 1 --- [           main] content line 4
2016-06-02 12:32:54.858  INFO 1 --- [           main] content line 5
2016-06-02 12:32:54.862  INFO 1 --- [           main] content line 6
2016-06-02 12:32:55.089  INFO 1 --- [           main] content line 7

This is the result in the console of Fluentd:

2016-06-02 12:32:53 +0000 content line 1
2016-06-02 12:32:53 +0000 [warn]: dump an error event: error_class=Fluent::ConcatFilter::TimeoutError error="Timeout flush: --> for content line 2
2016-06-02 12:32:53 +0000 [info]: Timeout flush: --> for content line 2
2016-06-02 12:32:53 +0000 content line 3
2016-06-02 12:32:54 +0000 [warn]: dump an error event: error_class=Fluent::ConcatFilter::TimeoutError error="Timeout flush: --> for content line 4
2016-06-02 12:32:54 +0000 [info]: Timeout flush: --> for content line 4
2016-06-02 12:32:54 +0000 content line 5
2016-06-02 12:32:54 +0000 content line 6
2016-06-02 12:32:55 +0000 [warn]: dump an error event: error_class=Fluent::ConcatFilter::TimeoutError error="Timeout flush: --> for content line 7
2016-06-02 12:32:55 +0000 [info]: Timeout flush: --> for content line 7

So i it seems that i am loosing log information, [lines 2 / 4 / 7] or is there another way to pickup the flushed information?

My configuration for fluentd is:

<source>
  @type forward
  port 24224
  bind 0.0.0.0
</source>
<filter *.docker.*>
  @type concat
  key log
  stream_identity_key container_id
  multiline_start_regexp /^(\d{4})-(\d{2})-(\d{2}) (\d{2}):(\d{2}):(\d{2})[^\s]+/
  flush_interval 30s
</filter>
<match *.docker.*>
    @type stdout
</match>

It seems that every second i am having this issue, i could not find a reason for this.

Optionally Disable `Timeout Error Event`

I was reading other issues about timeout errors and I think it's more natural to dispatch the message instead of emitting an error event.

Exemple:

<filter docker.**>
    type concat
    key log
    stream_identity_key container_id
    multiline_start_regexp /^[0-9]/
    flush_interval 5
    disable_timeout true    <------- This could help so much :D
</filter>

I'd like to submit a pull request but I am a little useless in ruby world...

unknown option 'required' for configuration parameter: key

<filter App1.*>
  @type concat
  key log
  stream_identity_key container_id
  multiline_start_regexp /^App1/
  multiline_end_regexp /^App1/
</filter>

Log lines from the application start with "App1 - ".
Starting up fluent (via docker-compose) results in the following error:

fluentd          | 2017-02-01 23:59:37 +0000 [error]: #0 unexpected error error_class=ArgumentError error="unknown option 'required' for configuration parameter: key"
fluentd          |   2017-02-01 23:59:37 +0000 [error]: #0 /usr/lib/ruby/gems/2.3.0/gems/fluentd-0.14.12/lib/fluent/config/configure_proxy.rb:244:in `block in config_parameter_option_validate!'
fluentd          |   2017-02-01 23:59:37 +0000 [error]: #0 /usr/lib/ruby/gems/2.3.0/gems/fluentd-0.14.12/lib/fluent/config/configure_proxy.rb:233:in `each_key'
fluentd          |   2017-02-01 23:59:37 +0000 [error]: #0 /usr/lib/ruby/gems/2.3.0/gems/fluentd-0.14.12/lib/fluent/config/configure_proxy.rb:233:in `config_parameter_option_validate!'
fluentd          |   2017-02-01 23:59:37 +0000 [error]: #0 /usr/lib/ruby/gems/2.3.0/gems/fluentd-0.14.12/lib/fluent/config/configure_proxy.rb:250:in `parameter_configuration'

I don't expect those errors, but rather an instance that will parse my multiple entries into a single entry, provided each entry begins with "App1 -".

No way to flush multiline events

There's no way to specify a flush interval, which means that a multiline event is only ever delivered after the next event is delivered. This can cause some headaches in production - if you log a multiline error, fluentd won't output it until at least one more message has been logged.

For example, the following config and input will hang forever with no output

<source>
  type tail
  path /var/log/*.log
  pos_file /var/log/varlog.log.pos
  format json
  tag jsonlog.*
  read_from_head true
</source>

<filter jsonlog.**>
  @type concat
  key log
  stream_identity_key container_id
  multiline_start_regexp /^\S+/
</filter>

<match **>
  type stdout
</match>
{"log":"Error connecting to database...","stream":"stdout","time":"2016-04-15T19:25:46.655449074Z", "container_id": "1234"}
{"log":"  sadly this stacktrace","stream":"stdout","time":"2016-04-15T19:25:46.655490401Z", "container_id": "1234"}
{"log":"  and associated error message","stream":"stdout","time":"2016-04-15T19:25:46.655498254Z", "container_id": "1234"}
{"log":"  will not be delivered for an indeterminate time","stream":"stdout","time":"2016-04-15T19:25:46.65550259Z", "container_id": "1234"}

Some sort of background job should run through all the in-flight multiline buffers and flush them after a timeout has expired.

Bug: Line matching both start and end regex can break timeout

Conditions:

  • Buffer contains message
  • Buffer times out; contents of buffer emitted
  • Line arrives which matches both multiline_start_regexp and multiline_end_regexp
  • Another line arrives

Expected behavior:

  1. When timeout expires, last line will be emitted

Observed behavior:

  1. Last line will not be emitted at all until bumped by another incoming line

Reproduction:

  • td-agent.conf:
<system>
    log_level trace
</system>

<source>
    @type tail
    tag test

    path '/var/log/td-agent/test.log'

    read_from_head true

    format none
</source>


<filter test.**>
    @type record_transformer

    <record>
        label 'Pre-concat'
    </record>
</filter>
<filter test.**>
    @type stdout
</filter>


<filter test.**>
    @type concat

    key message

    multiline_start_regexp /Start/
    multiline_end_regexp   /End/

    flush_interval 10
    timeout_label @FLUSH
    use_first_timestamp true
</filter>


<filter test.**>
    @type record_transformer

    <record>
        label 'Concat-out'
    </record>
</filter>
<filter test.**>
    @type stdout
</filter>
<match **>
    @type null
</match>


<label @FLUSH>
    <filter test.**>
        @type record_transformer

        <record>
            label 'Concat-flush'
        </record>
    </filter>
    <filter test.**>
        @type stdout
    </filter>
    <match **>
        @type null
    </match>
</label>
  • Test log generator '/etc/td-agent/test.sh':
#!/bin/env bash

echo "$(date) Start (Will time out)"
sleep 15
echo "$(date) Start End (Will return immediately)"
sleep 15
echo "$(date) Start (Will never time out)"
sleep 30
echo "$(date) Start (See?)"
  • Execution:
touch '/var/log/td-agent/test.log' && chmod td-agent:td-agent '/var/log/td-agent/test.log'
service td-agent start
sh /etc/td-agent/test.sh >> '/var/log/td-agent/test.log'
  • Output:
2016-08-22 17:39:35 -0400 [info]: reading config file path="/etc/td-agent/td-agent.conf"
2016-08-22 17:39:35 -0400 [info]: starting fluentd-0.12.26
2016-08-22 17:39:35 -0400 [trace]: registered buffer plugin 'file'
2016-08-22 17:39:35 -0400 [trace]: registered buffer plugin 'memory'
2016-08-22 17:39:35 -0400 [trace]: registered filter plugin 'grep'
2016-08-22 17:39:35 -0400 [trace]: registered filter plugin 'record_transformer'
2016-08-22 17:39:35 -0400 [trace]: registered filter plugin 'stdout'
2016-08-22 17:39:35 -0400 [trace]: registered input plugin 'debug_agent'
2016-08-22 17:39:35 -0400 [trace]: registered input plugin 'dummy'
2016-08-22 17:39:35 -0400 [trace]: registered input plugin 'exec'
2016-08-22 17:39:35 -0400 [trace]: registered input plugin 'forward'
2016-08-22 17:39:35 -0400 [trace]: registered input plugin 'gc_stat'
2016-08-22 17:39:35 -0400 [trace]: registered input plugin 'http'
2016-08-22 17:39:35 -0400 [trace]: registered input plugin 'monitor_agent'
2016-08-22 17:39:35 -0400 [trace]: registered input plugin 'object_space'
2016-08-22 17:39:35 -0400 [trace]: registered input plugin 'status'
2016-08-22 17:39:35 -0400 [trace]: registered input plugin 'unix'
2016-08-22 17:39:35 -0400 [trace]: registered input plugin 'syslog'
2016-08-22 17:39:35 -0400 [trace]: registered input plugin 'tail'
2016-08-22 17:39:35 -0400 [trace]: registered input plugin 'tcp'
2016-08-22 17:39:35 -0400 [trace]: registered input plugin 'udp'
2016-08-22 17:39:35 -0400 [trace]: registered output plugin 'copy'
2016-08-22 17:39:35 -0400 [trace]: registered output plugin 'exec'
2016-08-22 17:39:35 -0400 [trace]: registered output plugin 'exec_filter'
2016-08-22 17:39:35 -0400 [trace]: registered output plugin 'file'
2016-08-22 17:39:35 -0400 [trace]: registered output plugin 'forward'
2016-08-22 17:39:35 -0400 [trace]: registered output plugin 'null'
2016-08-22 17:39:35 -0400 [trace]: registered output plugin 'relabel'
2016-08-22 17:39:35 -0400 [trace]: registered output plugin 'roundrobin'
2016-08-22 17:39:35 -0400 [trace]: registered output plugin 'stdout'
2016-08-22 17:39:35 -0400 [trace]: registered output plugin 'tcp'
2016-08-22 17:39:35 -0400 [trace]: registered output plugin 'unix'
2016-08-22 17:39:35 -0400 [info]: gem 'fluent-mixin-config-placeholders' version '0.4.0'
2016-08-22 17:39:35 -0400 [info]: gem 'fluent-mixin-plaintextformatter' version '0.2.6'
2016-08-22 17:39:35 -0400 [info]: gem 'fluent-plugin-concat' version '0.6.0'
2016-08-22 17:39:35 -0400 [info]: gem 'fluent-plugin-concat' version '0.5.0'
2016-08-22 17:39:35 -0400 [info]: gem 'fluent-plugin-forest' version '0.3.1'
2016-08-22 17:39:35 -0400 [info]: gem 'fluent-plugin-mongo' version '0.7.13'
2016-08-22 17:39:35 -0400 [info]: gem 'fluent-plugin-multi-format-parser' version '0.0.2'
2016-08-22 17:39:35 -0400 [info]: gem 'fluent-plugin-rewrite-tag-filter' version '1.5.5'
2016-08-22 17:39:35 -0400 [info]: gem 'fluent-plugin-s3' version '0.6.8'
2016-08-22 17:39:35 -0400 [info]: gem 'fluent-plugin-sar' version '0.0.4'
2016-08-22 17:39:35 -0400 [info]: gem 'fluent-plugin-scribe' version '0.10.14'
2016-08-22 17:39:35 -0400 [info]: gem 'fluent-plugin-secure-forward' version '0.4.3'
2016-08-22 17:39:35 -0400 [info]: gem 'fluent-plugin-systemd' version '0.0.3'
2016-08-22 17:39:35 -0400 [info]: gem 'fluent-plugin-tail-multiline' version '0.1.5'
2016-08-22 17:39:35 -0400 [info]: gem 'fluent-plugin-td' version '0.10.28'
2016-08-22 17:39:35 -0400 [info]: gem 'fluent-plugin-td-monitoring' version '0.2.2'
2016-08-22 17:39:35 -0400 [info]: gem 'fluent-plugin-webhdfs' version '0.4.2'
2016-08-22 17:39:35 -0400 [info]: gem 'fluentd' version '0.12.26'
2016-08-22 17:39:35 -0400 [info]: gem 'fluentd' version '0.10.61'
2016-08-22 17:39:35 -0400 [info]: adding filter in @FLUSH pattern="test.**" type="record_transformer"
2016-08-22 17:39:36 -0400 [info]: adding filter in @FLUSH pattern="test.**" type="stdout"
2016-08-22 17:39:36 -0400 [info]: adding match in @FLUSH pattern="**" type="null"
2016-08-22 17:39:36 -0400 [info]: adding filter pattern="test.**" type="record_transformer"
2016-08-22 17:39:36 -0400 [info]: adding filter pattern="test.**" type="stdout"
2016-08-22 17:39:36 -0400 [info]: adding filter pattern="test.**" type="concat"
2016-08-22 17:39:36 -0400 [trace]: registered filter plugin 'concat'
2016-08-22 17:39:36 -0400 [info]: adding filter pattern="test.**" type="record_transformer"
2016-08-22 17:39:36 -0400 [info]: adding filter pattern="test.**" type="stdout"
2016-08-22 17:39:36 -0400 [info]: adding match pattern="**" type="null"
2016-08-22 17:39:36 -0400 [info]: adding source type="tail"
2016-08-22 17:39:36 -0400 [warn]: 'pos_file PATH' parameter is not set to a 'tail' source.
2016-08-22 17:39:36 -0400 [warn]: this parameter is highly recommended to save the position to resume tailing.
2016-08-22 17:39:36 -0400 [info]: using configuration file: <ROOT>
  <system>
    log_level trace
  </system>
  <source>
    @type tail
    tag test
    path /var/log/td-agent/test.log
    read_from_head true
    format none
  </source>
  <filter test.**>
    @type record_transformer
    <record>
      label Pre-concat
    </record>
  </filter>
  <filter test.**>
    @type stdout
  </filter>
  <filter test.**>
    @type concat
    key message
    multiline_start_regexp /Start/
    multiline_end_regexp /End/
    flush_interval 10
    timeout_label @FLUSH
    use_first_timestamp true
  </filter>
  <filter test.**>
    @type record_transformer
    <record>
      label Concat-out
    </record>
  </filter>
  <filter test.**>
    @type stdout
  </filter>
  <match **>
    @type null
  </match>
  <label @FLUSH>
    <filter test.**>
      @type record_transformer
      <record>
        label Concat-flush
      </record>
    </filter>
    <filter test.**>
      @type stdout
    </filter>
    <match **>
      @type null
    </match>
  </label>
</ROOT>
2016-08-22 17:39:36 -0400 [info]: following tail of /var/log/td-agent/test.log
2016-08-22 17:39:40 -0400 test: {"message":"Mon Aug 22 17:39:40 EDT 2016 Start (Will time out)","label":"Pre-concat"}
2016-08-22 17:39:50 -0400 test: {"message":"Mon Aug 22 17:39:40 EDT 2016 Start (Will time out)","label":"Concat-flush"}
2016-08-22 17:39:50 -0400 [info]: Timeout flush: test:default
2016-08-22 17:39:55 -0400 test: {"message":"Mon Aug 22 17:39:55 EDT 2016 Start End (Will return immediately)","label":"Pre-concat"}
2016-08-22 17:39:55 -0400 test: {"message":"Mon Aug 22 17:39:55 EDT 2016 Start End (Will return immediately)","label":"Concat-out"}
2016-08-22 17:40:10 -0400 test: {"message":"Mon Aug 22 17:40:10 EDT 2016 Start (Will never time out)","label":"Pre-concat"}
2016-08-22 17:40:40 -0400 test: {"message":"Mon Aug 22 17:40:40 EDT 2016 Start (See?)","label":"Pre-concat"}
2016-08-22 17:40:40 -0400 test: {"message":"Mon Aug 22 17:40:10 EDT 2016 Start (Will never time out)","label":"Concat-out"}

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    ๐Ÿ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. ๐Ÿ“Š๐Ÿ“ˆ๐ŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google โค๏ธ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.