Data going into wrong index-why


(Chandukreddi) #1

Hello Experts,

Different type of logs going into single index, technically it should go to different new (defined) indexes.

I am differentiating 2 different log files with TAG and then applying filter on TAG in logstash.

my filebeat.yaml file config:

  • type: log
    enabled: true
    paths:

    • /xxx/system.log.2018-03-18_2148
      tags: ["cassandra_test"]
      exclude_lines: ['^DBG']
      fields:
      app_id: cassandralog
      multiline.pattern: "^INFO|^WARN|^ERROR|^FATAL"
      multiline.negate: true
      multiline.match: after
      add_locale: ~
  • type: log
    paths:

    • /apth/all.log.2
      tags: ["swift_proxy_test"]
      fields:
      app_id: swiftproxylog
      multiline.pattern: "^Jan|^Feb|^Mar|^Apr|^May|^Jun|^Jul|^Aug|^Sep|^Oct|^Nov|^Dec"
      multiline.negate: true
      multiline.match: after

ouput to logstash

In my Logstash I have 2 config file one for cassandralog and another one for swiftproxylog

Cassandra.conf
input {
beats {
port => 5044
}
}

filter {
if "cassandra_test" in [tags]{
grok {
patterns_dir => ["path"]
break_on_match => true
match => {
"message" => [
#"%{CASS_COMPACTION_LARGE_KEY}",
"%{CASS_SLAB_POOL_CLEANER_1}",
"%{CASS_SLAB_POOL_CLEANER_2}",
# "%{CASS_MEMTABLE_FLUSH_START}",
# "%{CASS_MEMTABLE_FLUSH_COMPLETE}",
"%{CASS_BATCH_STATEMENT}",
"%{CASS_SIMS_TOMBSTONE}",
"%{CASS_COMPACTION_COMPLETE}",
"%{CASS_GC_GRACE}",
"%{CASS_SERVICE_THREAD_PENDING}"
]
}
add_tag => [ "cass_parsed" ]
}

output {
elasticsearch {
hosts => "host_ip:9200"
index => "prd-log-%{+YYYY.MM.dd.HH}-000001"
template => "cass_log_sizing_2.json"
template_name => "cassandra_log"
template_overwrite => true
}
}

Swiftproxylog.conf

input {
beats {
port => 5044
}
}

filter {
if "swift_proxy_test" in [tags]{
grok {
patterns_dir => ["path"]
break_on_match => true
match => { "message" => [ "%{SWIFT_P_ALL}" ] }
add_tag => [ "swift_all_parsed" ]
}

if "swift_all_parsed" not in [tags] {
grok {
patterns_dir => ["path"]
match => { "message" => ["%{SWIFT_P_204_499}"] }
add_tag => [ "swift_rest" ]
}
}

output {
elasticsearch {
hosts => "host_ip:9200"
index => "swift-proxy-log-%{+YYYY.MM.dd.HH}"
manage_template => "false"
}
}

when ever I ran logstash on Cassandra.conf all 2 files data going into prd-log-* index and if I ran
Swiftproxylog.conf all the data going into swift-proxy-log-* index it supposed to go it's own defined index as per filter TAG and elasticsearch output config.

But I do notice one thing, eventhough it's going to same index but I see tags are correct the only problem I see is data mix into one index.

below is the mixing data image:

This index shouldn't have this data

This index should have only this data not above one

Please advice/correct me if I am doing anything wrong here.
Thanks
Chandra


#2

If you are putting the two .conf files into the same directory and pointing logstash -f to that directory, then it concatentes the files, reads all the inputs, applies all the filters to all the events, and then sends the events to every output.

You can make the output conditional on the tags.

output {
 if "foo" in [tags] {
  elasticsearch {
    index => "this"
    ....
  } else {
   elasticsearch {
    index => "that"
    ...
  }
}

Data going into wrong index
(Chandukreddi) #3

Okay, let me try that.

but I am already doing that in filter right? do you think that will apply only for parsing? not for output?


#4

Definitely. A conditional in the filter does not apply to the output.


(Chandukreddi) #5

@Badger

In that case, I have 2 conf files in same directory then is that okay if I put output in one conf file with if and else if conditions?

And I have output conditions in 2 conf files does logstatsh parsing 2 times?

Thanks
Chandra


#6

Yes. Sometimes I use multiple conf files, one for each type of event (with conditionals based on a field or tag). So I might have one configuration file for IIS logs, one for Tomcat logs etc. Then I have one other file that defines the input and the output. There is one case where the output is something like

if [recordtype] in [ "this", "that", "theother" ] {
 output {
    ... one output
  }
} else if [recordtype] == "oranother" {
 output {
    ... another output
  }
} else {
 output {
    ... another output to catch everything left over
  }
}

If you have multiple conf files then logstash concatenates them into a single file. It is equivalent to doing "cat /path/to/configs/* > /tmp/file; logstash -f /tmp/file". This is useful to know if logstash complains it has a parsing error at line X, column Y of the config. You can just do that cat and vi the resulting file (or the equivalent on non-Linux platforms).

So having conditional outputs in two files results in it evaluating each condition, and if they are the same condition it will evaluate it twice, but it does not apply the filters twice.


(Chandukreddi) #7

Thanks for detailed explanation @Badger

I did same way as you suggested.

  1. Here I have 2 .conf files in my config folder

[root@localhost config]# ls -ltr *.conf
-rw-r--r--. 1 root root 2743 Mar 20 15:24 swift_proxy.conf >>>>> Swift log prase
-rw-r--r--. 1 root root 5746 Mar 20 16:14 cassandra-new.conf >>>> Cassandra log parse

  1. This is my output conditions but rest of filters are same(it is having only cassandra log parse conditions) and it has only cassandra log filter and patterns

[root@localhost config]# tail -30f cassandra-new.conf

changed index parameter to alias from index name

output {
if "cassandra_test" in [tags] {
elasticsearch {
hosts => "10.1.27.6:9200"
index => "logs_write_cas" >> Index alias
template => "cass_log_sizing_2.json"
template_name => "cassandra_log"
template_overwrite => true
}
}
else if "swift_proxy_test" in [tags] {
elasticsearch {
hosts => "10.1.27.6:9200"
index => "logs_write_swift" >>> Index alias
template => "swift_proxy_log_sizing_2.json"
template_name => "swift_proxy_log"
template_overwrite => true
}
}
}

  1. I am running ./bin/logstatsh -f /path/config/cassandra-new.conf, when I run this it is sending data to defined indexes as per the logic but swift logs (which are in else if condition) are not parsing it just sending into indexes with out parsing.

  2. and more over I am going to use pipelines.yml file to configure 2 conf files, in that case I don't want to parse the same log (cas-log, swift-log) files by 2 conf files.

Can you suggest me how can I do this?
Thanks
Chandra


#8

Then it is only using cassandra-new.conf. If you want it to use multiple configuration files run something like this, and make sure you just have the two files you want in that directory, with no backups or alternate versions of them.

./bin/logstash -f /path/config/

If the same beat is picking up both logs and writing them to the same place then there can only be one logstash beats input which consumes it, so that implies a single pipeline. However that can then forward logs based on a tag.

The pipeline with the beats input would not filter {}, just use a conditional to pick a tcp output. Then for each type of log have another pipeline which reads a tcp input, does the filter {} and unconditionally writes to elasticsearch.

So the first one would look something like

input { beats { port => 5044 } }
output {
  if "swift_proxy_test" in [tags] {
    tcp { host => "127.0.0.1" port => 11001 }
  else
    tcp { host => "127.0.0.1" port => 11002 }
  }
}

(Chandukreddi) #9

Hi @Badger

I tried your first option but I am running into below error.

[root@localhost config]# /opt/logstash-6.1.3/bin/logstash -f /opt/logstash-6.1.3/config/
2018-03-21 09:07:38,495 main ERROR Unable to locate appender "${sys:ls.log.format}_console" for logger config "root"
2018-03-21 09:07:38,496 main ERROR Unable to locate appender "${sys:ls.log.format}_rolling" for logger config "root"
2018-03-21 09:07:38,497 main ERROR Unable to locate appender "${sys:ls.log.format}_rolling_slowlog" for logger config "slowlog"
2018-03-21 09:07:38,497 main ERROR Unable to locate appender "${sys:ls.log.format}_console_slowlog" for logger config "slowlog"
2018-03-21 09:07:40,805 main ERROR Unable to locate appender "${sys:ls.log.format}_console" for logger config "root"
2018-03-21 09:07:40,805 main ERROR Unable to locate appender "${sys:ls.log.format}_rolling" for logger config "root"
2018-03-21 09:07:40,806 main ERROR Unable to locate appender "${sys:ls.log.format}_rolling_slowlog" for logger config "slowlog"
2018-03-21 09:07:40,806 main ERROR Unable to locate appender "${sys:ls.log.format}_console_slowlog" for logger config "slowlog"
Sending Logstash's logs to /opt/data/logs which is now configured via log4j2.properties
[2018-03-21T09:07:41,123][INFO ][logstash.modules.scaffold] Initializing module {:module_name=>"fb_apache", :directory=>"/opt/logstash-6.1.3/modules/fb_apache/configuration"}
[2018-03-21T09:07:41,141][INFO ][logstash.modules.scaffold] Initializing module {:module_name=>"netflow", :directory=>"/opt/logstash-6.1.3/modules/netflow/configuration"}
[2018-03-21T09:07:41,576][WARN ][logstash.config.source.multilocal] Ignoring the 'pipelines.yml' file because modules or command line options are specified
[2018-03-21T09:07:42,188][INFO ][logstash.runner ] Starting Logstash {"logstash.version"=>"6.1.3"}
[2018-03-21T09:07:42,684][INFO ][logstash.agent ] Successfully started Logstash API endpoint {:port=>9600}
[2018-03-21T09:07:42,902][ERROR][logstash.agent ] Failed to execute action {:action=>LogStash::PipelineAction::Create/pipeline_id:main, :exception=>"LogStash::ConfigurationError", :message=>"Expected one of #, input, filter, output at line 1, column 1 (byte 1) after ", :backtrace=>["/opt/logstash-6.1.3/logstash-core/lib/logstash/compiler.rb:42:in compile_imperative'", "/opt/logstash-6.1.3/logstash-core/lib/logstash/compiler.rb:50:incompile_graph'", "/opt/logstash-6.1.3/logstash-core/lib/logstash/compiler.rb:12:in block in compile_sources'", "org/jruby/RubyArray.java:2486:inmap'", "/opt/logstash-6.1.3/logstash-core/lib/logstash/compiler.rb:11:in compile_sources'", "/opt/logstash-6.1.3/logstash-core/lib/logstash/pipeline.rb:51:ininitialize'", "/opt/logstash-6.1.3/logstash-core/lib/logstash/pipeline.rb:171:in initialize'", "/opt/logstash-6.1.3/logstash-core/lib/logstash/pipeline_action/create.rb:40:inexecute'", "/opt/logstash-6.1.3/logstash-core/lib/logstash/agent.rb:335:in block in converge_state'", "/opt/logstash-6.1.3/logstash-core/lib/logstash/agent.rb:141:inwith_pipelines'", "/opt/logstash-6.1.3/logstash-core/lib/logstash/agent.rb:332:in block in converge_state'", "org/jruby/RubyArray.java:1734:ineach'", "/opt/logstash-6.1.3/logstash-core/lib/logstash/agent.rb:319:in converge_state'", "/opt/logstash-6.1.3/logstash-core/lib/logstash/agent.rb:166:inblock in converge_state_and_update'", "/opt/logstash-6.1.3/logstash-core/lib/logstash/agent.rb:141:in with_pipelines'", "/opt/logstash-6.1.3/logstash-core/lib/logstash/agent.rb:164:inconverge_state_and_update'", "/opt/logstash-6.1.3/logstash-core/lib/logstash/agent.rb:90:in execute'", "/opt/logstash-6.1.3/logstash-core/lib/logstash/runner.rb:343:inblock in execute'", "/opt/logstash-6.1.3/vendor/bundle/jruby/2.3.0/gems/stud-0.0.23/lib/stud/task.rb:24:in `block in initialize'"]}

Thanks
Chandra


#10

What does 'ls /opt/logstash-6.1.3/config/*' produce? Do you really just have the 2 .conf files in there?


(Chandukreddi) #11

Hi @Badger

Here you go!
[root@localhost config]# ls -ltr /opt/logstash-6.1.3/config/
total 92
-rw-r--r--. 1 elk elk 1702 Jan 26 11:56 startup.options
-rw-r--r--. 1 elk elk 6368 Jan 26 11:56 logstash.yml-org
-rw-r--r--. 1 elk elk 3958 Jan 26 11:56 log4j2.properties
-rw-r--r--. 1 elk elk 1846 Jan 26 11:56 jvm.options
-rw-r--r--. 1 elk elk 6392 Feb 27 11:56 logstash.yml
drwxrwxr-x. 2 elk elk 94 Feb 27 12:02 ${sys:ls.logs}
drwxr-xr-x. 2 root root 4096 Mar 12 09:12 backup
-rw-------. 1 root root 26896 Mar 12 10:54 nohup.out
-rw-r--r--. 1 root root 261 Mar 19 13:49 cass_log_sizing_2.json
-rw-r--r--. 1 root root 261 Mar 20 13:47 cass_log_sizing_2.json-org
-rw-r--r--. 1 root root 273 Mar 20 14:14 swift_proxy_log_sizing_2.jsonorg
-rw-r--r--. 1 root root 273 Mar 20 15:21 swift_proxy_log_sizing_2.json
-rw-r--r--. 1 root root 5486 Mar 21 07:52 cassandra-new.conf
-rw-r--r--. 1 root root 2796 Mar 21 07:53 swift_proxy.conf
-rw-r--r--. 1 elk elk 3229 Mar 21 07:57 pipelines.yml
drwxr-xr-x. 2 root root 89 Mar 21 09:06 backups

Thanks
Chandra


#12

If you pass a directory to -f then in concatentates every file in the directory and uses that as the config. That is why I said...

Create a subdirectory of config and put just the two files you want in it, then pass that to -f.


(Chandukreddi) #13

Hi @Brando,

Now I am running into below error after doing your suggested changes.
Error:
[2018-03-21T12:06:46,636][INFO ][logstash.outputs.elasticsearch] New Elasticsearch output {:class=>"LogStash::Outputs::ElasticSearch", :hosts=>["//10.1.27.6:9200"]}
[2018-03-21T12:06:47,368][INFO ][logstash.pipeline ] Starting pipeline {:pipeline_id=>"main", "pipeline.workers"=>3, "pipeline.batch.size"=>125, "pipeline.batch.delay"=>5, "pipeline.max_inflight"=>375, :thread=>"#<Thread:0xfaee021 run>"}
[2018-03-21T12:06:47,750][INFO ][logstash.inputs.beats ] Beats inputs: Starting input listener {:address=>"0.0.0.0:5044"}
[2018-03-21T12:06:47,805][INFO ][logstash.inputs.beats ] Beats inputs: Starting input listener {:address=>"0.0.0.0:5044"}
[2018-03-21T12:06:47,814][INFO ][logstash.pipeline ] Pipeline started {"pipeline.id"=>"main"}
[2018-03-21T12:06:47,915][INFO ][org.logstash.beats.Server] Starting server on port: 5044
[2018-03-21T12:06:47,919][INFO ][org.logstash.beats.Server] Starting server on port: 5044
[2018-03-21T12:06:48,143][INFO ][logstash.agent ] Pipelines running {:count=>1, :pipelines=>["main"]}
[2018-03-21T12:06:54,465][ERROR][logstash.pipeline ] A plugin had an unrecoverable error. Will restart this plugin.
Pipeline_id:main
Plugin: <LogStash::Inputs::Beats port=>5044, id=>"840fec33db0842ec3625d5ca45c27201c42270652f98b0e518ee9e959900e251", enable_metric=>true, codec=><LogStash::Codecs::Plain id=>"plain_110114dc-5f9c-4ea0-bf3d-1e09efcd3aa4", enable_metric=>true, charset=>"UTF-8">, host=>"0.0.0.0", ssl=>false, ssl_verify_mode=>"none", include_codec_tag=>true, ssl_handshake_timeout=>10000, tls_min_version=>1, tls_max_version=>1.2, cipher_suites=>["TLS_ECDHE_ECDSA_WITH_AES_256_GCM_SHA384", "TLS_ECDHE_RSA_WITH_AES_256_GCM_SHA384", "TLS_ECDHE_ECDSA_WITH_AES_128_GCM_SHA256", "TLS_ECDHE_RSA_WITH_AES_128_GCM_SHA256", "TLS_ECDHE_ECDSA_WITH_AES_256_CBC_SHA384", "TLS_ECDHE_RSA_WITH_AES_256_CBC_SHA384", "TLS_ECDHE_RSA_WITH_AES_128_CBC_SHA256", "TLS_ECDHE_ECDSA_WITH_AES_128_CBC_SHA256"], client_inactivity_timeout=>60, executor_threads=>32>
Error: Address already in use
Exception: Java::JavaNet::BindException
Stack: sun.nio.ch.Net.listen(Native Method)
sun.nio.ch.ServerSocketChannelImpl.bind(sun/nio/ch/ServerSocketChannelImpl.java:224)
io.netty.channel.socket.nio.NioServerSocketChannel.doBind(io/netty/channel/socket/nio/NioServerSocketChannel.java:128)
io.netty.channel.AbstractChannel$AbstractUnsafe.bind(io/netty/channel/AbstractChannel.java:558)
io.netty.channel.DefaultChannelPipeline$HeadContext.bind(io/netty/channel/DefaultChannelPipeline.java:1283)
io.netty.channel.AbstractChannelHandlerContext.invokeBind(io/netty/channel/AbstractChannelHandlerContext.java:501)
io.netty.channel.AbstractChannelHandlerContext.bind(io/netty/channel/AbstractChannelHandlerContext.java:486)
io.netty.channel.DefaultChannelPipeline.bind(io/netty/channel/DefaultChannelPipeline.java:989)
io.netty.channel.AbstractChannel.bind(io/netty/channel/AbstractChannel.java:254)
io.netty.bootstrap.AbstractBootstrap$2.run(io/netty/bootstrap/AbstractBootstrap.java:364)
io.netty.util.concurrent.AbstractEventExecutor.safeExecute(io/netty/util/concurrent/AbstractEventExecutor.java:163)
io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(io/netty/util/concurrent/SingleThreadEventExecutor.java:403)
io.netty.channel.nio.NioEventLoop.run(io/netty/channel/nio/NioEventLoop.java:463)
io.netty.util.concurrent.SingleThreadEventExecutor$5.run(io/netty/util/concurrent/SingleThreadEventExecutor.java:858)
io.netty.util.concurrent.FastThreadLocalRunnable.run(io/netty/util/concurrent/FastThreadLocalRunnable.java:30)
java.lang.Thread.run(java/lang/Thread.java:745)
[2018-03-21T12:06:55,473][INFO ][org.logstash.beats.Server] Starting server on port: 5044
[2018-03-21T12:06:55,475][WARN ][io.netty.channel.AbstractChannel] Force-closing a channel whose registration task was not accepted by an event loop: [id: 0xb2c6eb27]
java.util.concurrent.RejectedExecutionException: event executor terminated
at io.netty.util.concurrent.SingleThreadEventExecutor.reject(SingleThreadEventExecutor.java:821) ~[netty-all-4.1.18.Final.jar:4.1.18.Final]
at io.netty.util.concurrent.SingleThreadEventExecutor.offerTask(SingleThreadEventExecutor.java:327) ~[netty-all-4.1.18.Final.jar:4.1.18.Final]
at io.netty.util.concurrent.SingleThreadEventExecutor.addTask(SingleThreadEventExecutor.java:320) ~[netty-all-4.1.18.Final.jar:4.1.18.Final]
at io.netty.util.concurrent.SingleThreadEventExecutor.execute(SingleThreadEventExecutor.java:746) ~[netty-all-4.1.18.Final.jar:4.1.18.Final]
at io.netty.channel.AbstractChannel$AbstractUnsafe.register(AbstractChannel.java:479) [netty-all-4.1.18.Final.jar:4.1.18.Final]
at io.netty.channel.SingleThreadEventLoop.register(SingleThreadEventLoop.java:80) [netty-all-4.1.18.Final.jar:4.1.18.Final]
at io.netty.channel.SingleThreadEventLoop.register(SingleThreadEventLoop.java:74) [netty-all-4.1.18.Final.jar:4.1.18.Final]
at io.netty.channel.MultithreadEventLoopGroup.register(MultithreadEventLoopGroup.java:86) [netty-all-4.1.18.Final.jar:4.1.18.Final]
at io.netty.bootstrap.AbstractBootstrap.initAndRegister(AbstractBootstrap.java:331) [netty-all-4.1.18.Final.jar:4.1.18.Final]
at io.netty.bootstrap.AbstractBootstrap.doBind(AbstractBootstrap.java:282) [netty-all-4.1.18.Final.jar:4.1.18.Final]
at io.netty.bootstrap.AbstractBootstrap.bind(AbstractBootstrap.java:278) [netty-all-4.1.18.Final.jar:4.1.18.Final]
at io.netty.bootstrap.AbstractBootstrap.bind(AbstractBootstrap.java:260) [netty-all-4.1.18.Final.jar:4.1.18.Final]
at org.logstash.beats.Server.listen(Server.java:65) [logstash-input-beats-5.0.6.jar:?]
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[?:1.8.0_65]
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) ~[?:1.8.0_65]
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[?:1.8.0_65]
at java.lang.reflect.Method.invoke(Method.java:497) ~[?:1.8.0_65]
at org.jruby.javasupport.JavaMethod.invokeDirectWithExceptionHandling(JavaMethod.java:438) [?:?]
at org.jruby.javasupport.JavaMethod.invokeDirect(JavaMethod.java:302) [?:?]

Thanks
Chandra


#14

You are trying to start two beats inputs on the same port. One of them starts, the other gets 'Address already in use'. Remove the input from one of the two files.


(Chandukreddi) #15

Excellent @Badger.. I really appreciate your time to solve my problem :slight_smile:


(system) #16

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.