Filebeat/Elastic/ECS source field conflict

I am having an issue that seems it may be configurable, but I am not sure where to look next. I have filebeat setup up and most of my traffic is standard syslog stuff. I have one feed that is using a non-typical path though. For example, /var/log/mydevice instead of /var/syslog/mydevice//mydevice.log etc.

The same log entry works great from a syslog path, not from the alternate which I will need to use for reasons out of my control. The issue is on the logstash side though I think. My filter uses a json filter so
filter {
json {
source => "filebeat_data"
}
}
mutate {
add_field => {
"[source][ip]" => "1.1.1.1"
}
}

When I run this from the alternate path I get a very nasty crash of the whole logstash pipeline(I can provide a log if necessary) but it works correctly when using the syslog path. It is very clearly a conflict with the 'source' keyword here which is important to ECS, but also the JSON filter. Has anyone else run into this?
logstash 6.7. filebeat 6.1.3

Please do.

Logstash.conf:

input {
  beats {
    port => 3520
  }
}

filter {
  json { source => "filebeat_data" }
  mutate { add_field => { "[source][ip]" => "whattheactualf" } }
}

output {
#  elasticsearch {
#    hosts => ["https://x.x.x.x:9200"]
...
#  }

}

Filebeat.yml:
filebeat.prospectors:
- paths:
- /var/log/mydevice/mydevice-http.log
tags: ["mydevice-json-http"]
close_renamed: true
close_removed: true

    - paths:
       - /var/log/mydevice-http.log
     tags: [mydevice-json-http"]
     close_renamed: true
     close_removed: true

    - paths:
       - /var/syslog/mydevice/mydevice-http.log
     tags: ["mydevice-json-http"]
     close_renamed: true
     close_removed: true

     output.logstash:
      hosts: ["x.x.x.x:3520"]
      compression_level: 3
      worker: 6
      loadbalance: true

The log is massive character wise but only a few lines. I tried to truncate a lot of noise in the middle.

[2020-01-02T21:12:09,837][DEBUG][logstash.pipeline        ] filter received {"event"=>{"@version"=>"1", "message"=>"{\"timestamp\":\"2020-01-03T02:12:03+0000\",\"rule\":{},\"agent\":{\"id\":\"000\",\"name\":\"ip-10-170-128-170\"},\"manager\":{\"name\":\"ip-10-170-128-170\"},\"id\":\"1578017523.3398\",\"full_log\":\"Jan  3 02:12:01 ip-10-170-128-170 CRON[2956]: pam_unix(cron:session): session opened for user root by (uid=0)\",\"predecoder\":{\"program_name\":\"CRON\",\"timestamp\":\"Jan  3 02:12:01\",\"hostname\":\"ip-10-170-128-170\"},\"decoder\":{\"parent\":\"pam\",\"name\":\"pam\"},\"data\":{\"dstuser\":\"root\",\"uid\":\"0\"},\"location\":\"/var/log/auth.log\"}", "tags"=>["fullQueue", "beats_input_codec_plain_applied"], "offset"=>105663, "source"=>"/var/ossec/logs/archives/archives.json", "host"=>"ip-10-170-128-170", "beat"=>{"name"=>"ip-10-170-128-170", "version"=>"6.1.3", "hostname"=>"ip-10-170-128-170"}, "@timestamp"=>2020-01-03T02:12:07.709Z}}
[2020-01-02T21:12:09,837][DEBUG][logstash.filters.json    ] Running json filter {:event=>#<LogStash::Event:0x481c8c8b>}
[2020-01-02T21:12:09,838][DEBUG][logstash.pipeline        ] filter received {"event"=>{"@version"=>"1", "message"=>"{\"timestamp\":\"2020-01-03T02:12:03+0000\",\"rule\":{},\"agent\":{\"id\":\"000\",\"name\":\"ip-10-170-128-170\"},\"manager\":{\"name\":\"ip-10-170-128-170\"},\"id\":\"1578017523.3398\",\"full_log\":\"Jan  3 02:12:01 ip-10-170-128-170 CRON[2957]: (root) CMD (/usr/local/sbin/openvpn-status.sh >/dev/null 2>&1)\",\"predecoder\":{\"program_name\":\"CRON\",\"timestamp\":\"Jan  3 02:12:01\",\"hostname\":\"ip-10-170-128-170\"},\"decoder\":{},\"location\":\"/var/log/syslog\"}", "tags"=>["fullQueue", "beats_input_codec_plain_applied"], "offset"=>106086, "source"=>"/var/ossec/logs/archives/archives.json", "host"=>"ip-10-170-128-170", "beat"=>{"name"=>"ip-10-170-128-170", "version"=>"6.1.3", "hostname"=>"ip-10-170-128-170"}, "@timestamp"=>2020-01-03T02:12:07.709Z}}
[2020-01-02T21:12:09,838][DEBUG][logstash.filters.json    ] Running json filter {:event=>#<LogStash::Event:0x558224c2>}
[2020-01-02T21:12:09,840][ERROR][logstash.pipeline        ] org.logstash.Accessors.setChild(Accessors.java:122)", "org.logstash.Accessors.set(Accessors.java:16)", "org.logstash.Event.setField(Event.java:188)", "org.logstash.ext.JrubyEventExtLibrary$RubyEvent.ruby_set_field(JrubyEventExtLibrary.java:99)", "org.logstash.ext.JrubyEventExtLibrary$RubyEvent$INVOKER$i$2$0$ruby_set_field.call(JrubyEventExtLibrary$RubyEvent$INVOKER$i$2$0$ruby_set_field.gen)", "org.jruby.runtime.callsite.CachingCallSite.call
...(RubyHash.java:1438)", "org.jruby.RubyHash$12.visit(RubyHash.java:1435)", "org.jruby.RubyHash.visitLimited(RubyHash.java:690)", "org.jruby.RubyHash.visitAll(RubyHash.java:675)", "org.jruby.RubyHash.iteratorVisitAll(RubyHash.java:1395)", "org.jruby.RubyHash.each_pairCommon(RubyHash.java:1430)", "org.jruby.RubyHash.each(RubyHash.java:1419)", "org.jruby.RubyHash$INVOKER$i$0$0$each.call(RubyHash$INVOKER$i$0$0$each.gen)", "org.jruby.runtime.callsite.CachingCallSite.call(CachingCallSite.java:149)", "org.jruby.runtime.callsite.CachingCallSite.callIter(CachingCallSite.java:158)", "usr.share.logstash.logstash_minus_core.lib.logstash.util.decorators.invokeOther20:each(/usr/share/logstash/logstash-core/lib/logstash/util/decorators.rb:14)", "usr.share.logstash.logstash_minus_core.lib.logstash.util.decorators.RUBY$method$add_fields$0(/usr/share/logstash/logstash-core/lib/logstash/util/decorators.rb:14)", "org.jruby.internal.runtime.methods.CompiledIRMethod.call(CompiledIRMethod.java:143)", "org.jruby.internal.runtime.methods.MixedModeIRMethod.call(MixedModeIRMethod.java:221)", "org.jruby.internal.runtime.methods.DynamicMethod.call(DynamicMethod.java:215)", "org.jruby.runtime.callsite.CachingCallSite.call(CachingCallSite.java:232)", "usr.share.logstash.logstash_minus_core.lib.logstash.filters.base.invokeOther32:add_fields(/usr/share/logstash/logstash-core/lib/logstash/filters/base.rb:182)", "usr.share.logstash.logstash_minus_core.lib.logstash.filters.base.RUBY$method$filter_matched$0(/usr/share/logstash/logstash-core/lib/logstash/filters/base.rb:182)", "org.jruby.internal.runtime.methods.CompiledIRMethod.call(CompiledIRMethod.java:117)", "org.jruby.internal.runtime.methods.MixedModeIRMethod.call(MixedModeIRMethod.java:156)", "org.jruby.internal.runtime.methods.DynamicMethod.call(DynamicMethod.java:199)", "org.jruby.runtime.callsite.CachingCallSite.call(CachingCallSite.java:170)", "usr.share.logstash.vendor.bundle.jruby.$2_dot_5_dot_0.gems.logstash_minus_filter_minus_mutate_minus_3_dot_4_dot_0.lib.logstash.filters.mutate.invokeOther30:filter_matched(/usr/share/logstash/vendor/bundle/jruby/2.5.0/gems/logstash-filter-mutate-3.4.0/lib/logstash/filters/mutate.rb:264)", "usr.share.logstash.vendor.bundle.jruby.$2_dot_5_dot_0.gems.logstash_minus_filter_minus_mutate_minus_3_dot_4_dot_0.lib.logstash.filters.mutate.RUBY$method$filter$0(/usr/share/logstash/vendor/bundle/jruby/2.5.0/gems/logstash-filter-mutate-3.4.0/lib/logstash/filters/mutate.rb:264)", "org.jruby.internal.runtime.methods.CompiledIRMethod.call(CompiledIRMethod.java:117)", "org.jruby.internal.runtime.methods.MixedModeIRMethod.call(MixedModeIRMethod.java:156)", "org.jruby.runtime.callsite.CachingCallSite.cacheAndCall(CachingCallSite.java:357)", "org.jruby.runtime.callsite.CachingCallSite.call(CachingCallSite.java:182)", "usr.share.logstash.logstash_minus_core.lib.logstash.filters.base.invokeOther4:filter(/usr/share/logstash/logstash-core/lib/logstash/filters/base.rb:143)", "usr.share.logstash.logstash_minus_core.lib.logstash.filters.base.RUBY$method$do_filter$0(/usr/share/logstash/logstash-core/lib/logstash/filters/base.rb:143)", "org.jruby.internal.runtime.methods.CompiledIRMethod.call(CompiledIRMethod.java:117)", "org.jruby.internal.runtime.methods.MixedModeIRMethod.call(MixedModeIRMethod.java:156)", "org.jruby.runtime.callsite.CachingCallSite.cacheAndCall(CachingCallSite.java:357)", "org.jruby.runtime.callsite.CachingCallSite.call(CachingCallSite.java:182)", "org.jruby.runtime.callsite.CachingCallSite.callIter(CachingCallSite.java:189)", "usr.share.logstash.logstash_minus_core.lib.logstash.filters.base.invokeOther4:do_filter(/usr/share/logstash/logstash-core/lib/logstash/filters/base.rb:162)", "usr.share.logstash.logstash_minus_core.lib.logstash.filters.base.RUBY$block$multi_filter$1(/usr/share/logstash/logstash-core/lib/logstash/filters/base.rb:162)", "org.jruby.runtime.CompiledIRBlockBody.yieldDirect(CompiledIRBlockBody.java:146)", "org.jruby.runtime.BlockBody.yield(BlockBody.java:114)", "org.jruby.runtime.Block.yield(Block.java:165)", "org.jruby.RubyArray.each(RubyArray.java:1792)", "org.jruby.RubyArray$INVOKER$i$0$0$each.call(RubyArray$INVOKER$i$0$0$each.gen)", "org.jruby.runtime.callsite.CachingCallSite.call(CachingCallSite.java:149)", "org.jruby.runtime.callsite.CachingCallSite.callIter(CachingCallSite.java:158)", "usr.share.logstash.logstash_minus_core.lib.logstash.filters.base.invokeOther7:each(/usr/share/logstash/logstash-core/lib/logstash/filters/base.rb:159)", "usr.share.logstash.logstash_minus_core.lib.logstash.filters.base.RUBY$method$multi_filter$0(/usr/share/logstash/logstash-core/lib/logstash/filters/base.rb:159)", "org.jruby.internal.runtime.methods.CompiledIRMethod.call(CompiledIRMethod.java:117)", "org.jruby.internal.runtime.methods.MixedModeIRMethod.call(MixedModeIRMethod.java:156)", "org.jruby.internal.runtime.methods.DynamicMethod.call(DynamicMethod.java:199)", "org.logstash.config.ir.compiler.FilterDelegatorExt.doMultiFilter(FilterDelegatorExt.java:99)", "org.logstash.config.ir.compiler.AbstractFilterDelegatorExt.multiFilter(AbstractFilterDelegatorExt.java:115)", "org.logstash.config.ir.compiler.AbstractFilterDelegatorExt$INVOKER$i$1$0$multiFilter.call(AbstractFilterDelegatorExt$INVOKER$i$1$0$multiFilter.gen)", "org.jruby.runtime.callsite.CachingCallSite.call(CachingCallSite.java:170)", "org.jruby.ir.interpreter.InterpreterEngine.processCall(InterpreterEngine.java:317)", "org.jruby.ir.interpreter.StartupInterpreterEngine.interpret(StartupInterpreterEngine.java:72)", "org.jruby.ir.interpreter.Interpreter.INTERPRET_BLOCK(Interpreter.java:128)", "org.jruby.runtime.MixedModeIRBlockBody.commonYieldPath(MixedModeIRBlockBody.java:151)", "org.jruby.runtime.IRBlockBody.call(IRBlockBody.java:79)", "org.jruby.runtime.Block.call(Block.java:124)", "org.jruby.RubyProc.call(RubyProc.java:286)", "org.jruby.internal.runtime.methods.ProcMethod.call(ProcMethod.java:64)", "org.jruby.internal.runtime.methods.DynamicMethod.call(DynamicMethod.java:203)", "org.jruby.internal.runtime.methods.DynamicMethod.call(DynamicMethod.java:199)", "org.jruby.runtime.callsite.CachingCallSite.call(CachingCallSite.java:170)", "usr.share.logstash.logstash_minus_core.lib.logstash.pipeline.invokeOther3:filter_func(/usr/share/logstash/logstash-core/lib/logstash/pipeline.rb:358)", "usr.share.logstash.logstash_minus_core.lib.logstash.pipeline.RUBY$method$filter_batch$0(/usr/share/logstash/logstash-core/lib/logstash/pipeline.rb:358)", "org.jruby.internal.runtime.methods.CompiledIRMethod.call(CompiledIRMethod.java:117)", "org.jruby.internal.runtime.methods.MixedModeIRMethod.call(MixedModeIRMethod.java:156)", "org.jruby.internal.runtime.methods.DynamicMethod.call(DynamicMethod.java:199)", "org.jruby.runtime.callsite.CachingCallSite.call(CachingCallSite.java:170)", "org.jruby.ir.interpreter.InterpreterEngine.processCall(InterpreterEngine.java:317)", "org.jruby.ir.interpreter.StartupInterpreterEngine.interpret(StartupInterpreterEngine.java:72)", "org.jruby.ir.interpreter.InterpreterEngine.interpret(InterpreterEngine.java:92)", "org.jruby.internal.runtime.methods.MixedModeIRMethod.INTERPRET_METHOD... (BlockBody.java:114)", "org.jruby.runtime.Block.yield(Block.java:165)", "org.jruby.RubyHash$12.visit(RubyHash.java:1438)", "org.jruby.RubyHash$12.visit(RubyHash.java:1435)", "org.jruby.RubyHash.visitLimited(RubyHash.java:690)", "org.jruby.RubyHash.visitAll(RubyHash.java:675)", "org.jruby.RubyHash.iteratorVisitAll(RubyHash.java:1395)", "org.jruby.RubyHash.each_pairCommon...
core/lib/logstash/filters/base.rb:162)", "org.jruby.runtime.CompiledIRBlockBody.yieldDirect(CompiledIRBlockBody.java:146)", "org.jruby.runtime.BlockBody.yield(BlockBody.java:114)", "org.jruby.runtime.Block.yield(Block.java:165)", "org.jruby.RubyArray.each(RubyArray.java:1792)", "org.jruby.RubyArray$INVOKER$i$0$0$each.call(RubyArray$INVOKER$i$0$0$each.gen)", "org.jruby.runtime.callsite.CachingCallSite.call(CachingCallSite.java:149)", "org.jruby.runtime.callsite.CachingCallSite.callIter(CachingCallSite.java:158)", "usr.share.logstash.logstash_minus_core.lib.logstash.filters.base.invokeOther7:each(/usr/share/logstash/logstash-core/lib/logstash/filters/base.rb:159)", "usr.share.logstash.logstash_minus_core.lib.logstash.filters.base.RUBY$method$multi_filter$0(/usr/share/logstash/logstash-core/lib/logstash/filters/base.rb:159)", "org.jruby.internal.runtime.methods.CompiledIRMethod.call(CompiledIRMethod.java:117)", "org.jruby.internal.runtime.methods.MixedModeIRMethod.call(MixedModeIRMethod.java:156)", "org.jruby.internal.runtime.methods.DynamicMethod.call(DynamicMethod.java:199)", "org.logstash.config.ir.compiler.FilterDelegatorExt.doMultiFilter(FilterDelegatorExt.java:99)", "org.logstash.config.ir.compiler.AbstractFilterDelegatorExt.multiFilter(AbstractFilterDelegatorExt.java:115)", "org.logstash.config.ir.compiler.AbstractFilterDelegatorExt$INVOKER$i$1$0$multiFilter.call(AbstractFilterDelegatorExt$INVOKER$i$1$0$multiFilter.gen)", "org.jruby.runtime.callsite.CachingCallSite.call(CachingCallSite.java:170)", "org.jruby.ir.interpreter.InterpreterEngine.processCall(StartupInterpreterEngine.java:72)", "org.jruby.ir.interpreter.Interpreter.INTERPRET_BLOCK(Interpreter.java:128)", "org.jruby.runtime.MixedModeIRBlockBody.commonYieldPath(MixedModeIRBlockBody.java:151)", "org.jruby.runtime.IRBlockBody.call(IRBlockBody.java:79)", "org.jruby.runtime.Block.call(Block.java:124)", "org.jruby.RubyProc.call(RubyProc.java:286)", "org.jruby.RubyProc.call(RubyProc.java:270)", "org.jruby.internal.runtime.RubyRunnable.run(RubyRunnable.java:105)", "java.lang.Thread.run(Thread.java:748)"]}
[2020-01-02T21:12:09,870][ERROR][org.logstash.Logstash    ] java.lang.IllegalStateException: Logstash stopped processing because of an error: (SystemExit) exit
[2020-01-02T21:12:18,699][DEBUG][logstash.modules.scaffold] Found module {:module_name=>"fb_apache", :directory=>"/usr/share/logstash/modules/fb_apache/configuration"}
    indent preformatted text by 4 spaces

That's odd. I suspect it is related to the value of [source] before you try to add [source][ip]. If [source] were a text value I would expect you to get an InvalidFieldSetException which looks like this:

Exception caught while applying mutate filter {:exception=>"Could not set field 'ip' on object 'foo' to value 'whattheactualf'.This is probably due to trying to set a field like [foo][bar] = someValuewhen [foo] is not either a map or a string"}

Not sure why the exception is not getting caught and logged.

Try changing the name of the field you add to [xsource][ip] and verify the error goes away.

It does go away if I change the field to something other than source. It definitely appears to be some sort of odd conflict, even more strange since it only happens on the /var/log/xxx path and not the syslog path. I would expect to see the issue happen regardless of the logsource path you know?

Okay, I have the solution. Partially my own dev error, but also a logstash gotcha. It looks like filebeat sends the source field over as part of log entries. So, you will get a "source => /var/log/mydevice.log" keypair. Therefore, Badgers response earlier gave me the cue as I am trying to assign an object to a string field that already exists. Odd, that I did not get an error about it, but that was the case. Simply removing/renaming that source field prior to creating the object source fixes it.

The reason I was getting it on only one path was a handy little line dropped in an earlier filter that actually fixed this for syslog only, though I doubt the author realized they were even doing that.
My thanks to Badger for pointing me the right way.

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.