Logstash Filter for JSON data


(Guta Saputra) #1

hello,

i have the same issue with this post.

the difference is he use filebeat to send data to logstash, then the logstash will send data to elasticsearch.

i use logstash to load my log file (written in json), then i export it to elasticsearch.
but the problem is my json not separated to different object in elasticsearch.
its like they merged into one object, named message.

"message": "{ \n "user_id":0,\n "cart_id":"222",\n "error_status":"",\n "error_message":"",\n "data":{ \n "ord_id":2233,\n "cart":{ \n "cart_id":"222",\n "session_id":"7afaaf7fac9bb934de4c6ecb2567461a6178495e",\n "ip_address":"180.254.65.91",\n "usr_id":"0",\n "created_date":"2017-01-19 17:29:20",\n "updated_date":"2017-01-19 18:08:35"\n },\n "cart_data":{ \n "coupon_code":"",\n "ord_email":"gutasaputra@gmail.com",\n "ord_firstname":"guta"}"

what i want is like this :

"message": "{
user_id:0,
cart_id:3486,
ord_email:gutasaputra@gmail.com
}"

how can i do that via logstash?

ohy, here is my logstash conf :

input
{
    file
    {
        codec => multiline
        {
            pattern => '^\{'
            negate => true
            what => previous
        }

        path => ["/usr/local/Cellar/logstash/5.1.1/test_payment.json"]
        start_position => "beginning"
        sincedb_path => "/dev/null"
    }
}

filter {
    if [message] =~ /^{.*}$/
    {
        json { source => message }
    }

}

output {
  elasticsearch {
    hosts => ["localhost:9200"]
    user => elastic
    password => techno2013
    index => "test_payment"
 }
  stdout { codec => rubydebug }
}

thank you


(Magnus Bäck) #2

The correct solution is to use a json filter just like you're doing now. If it doesn't seem to be working, check that there's nothing wrong with your conditional (i.e. try commenting it) and that the JSON parsing isn't failing (check the Logstash log).


(Guta Saputra) #3

here is my logstash log file.

[2017-01-24T13:55:11,592][INFO ][logstash.outputs.elasticsearch] New Elasticsearch output {:class=>"LogStash::Outputs::ElasticSearch", :hosts=>["localhost:9200"]}
[2017-01-24T13:55:11,596][INFO ][logstash.pipeline ] Starting pipeline {"id"=>"main", "pipeline.workers"=>4, "pipeline.batch.size"=>125, "pipeline.batch.delay"=>5, "pipeline.max_inflight"=>500}
[2017-01-24T13:55:11,637][INFO ][logstash.pipeline ] Pipeline main started
[2017-01-24T13:55:11,700][INFO ][logstash.agent ] Successfully started Logstash API endpoint {:port=>9600}

the only error msg that i had in my log file is this :

[2017-01-24T13:53:02,894][INFO ][logstash.agent ] Successfully started Logstash API endpoint {:port=>9600}
"org.jruby.runtime.CompiledBlock19.yield(CompiledBlock19.java:159)", "org.jruby.runtime.CompiledBlock19.call(CompiledBlock19.java:87)", "org.jruby.runtime.Block.call(Block.java:101)", "org.jruby.RubyProc.call(RubyProc.java:300)", "org.jruby.RubyProc.call19(RubyProc.java:281)", "org.jruby.RubyProc$INVOKER$i$0$0$call19.call(RubyProc$INVOKER$i$0$0$call19.gen)", "org.jruby.internal.runtime.methods.DynamicMethod.call(DynamicMethod.java:210)", "org.jruby.internal.runtime.methods.DynamicMethod.call(DynamicMethod.java:206)", "org.jruby.runtime.callsite.CachingCallSite.call(CachingCallSite.java:168)", "rubyjit.LogStash::Util::WrappedSynchronousQueue::ReadBatch$$each_8ee46fd8c62d1155253b7a084f2e67ff25953d7e1956725890.block_0$RUBY$file(/usr/local/Cellar/logstash/5.1.1/libexec/logstash-core/lib/logstash/util/wrapped_synchronous_queue.rb:192)", "rubyjit$LogStash::Util::WrappedSynchronousQueue::ReadBatch$$each_8ee46fd8c62d1155253b7a084f2e67ff25953d7e1956725890$block_0$RUBY$file.call(rubyjit$LogStash::Util::WrappedSynchronousQueue::ReadBatch$$each_8ee46fd8c62d1155253b7a084f2e67ff25953d7e1956725890$block_0$RUBY$file)", "org.jruby.runtime.CompiledBlock19.yield(CompiledBlock19.java:135)", "org.jruby.runtime.Block.yield(Block.java:142)", "org.jruby.RubyHash$13.visit(RubyHash.java:1355)", "org.jruby.RubyHash.visitLimited(RubyHash.java:648)", "org.jruby.RubyHash.visitAll(RubyHash.java:634)", "org.jruby.RubyHash.iteratorVisitAll(RubyHash.java:1306)", "org.jruby.RubyHash.each_pairCommon(RubyHash.java:1351)", "org.jruby.RubyHash.each19(RubyHash.java:1342)", "org.jruby.RubyHash$INVOKER$i$0$0$each19.call(RubyHash$INVOKER$i$0$0$each19.gen)", "org.jruby.runtime.callsite.CachingCallSite.callBlock(CachingCallSite.java:143)", "org.jruby.runtime.callsite.CachingCallSite.callIter(CachingCallSite.java:154)", "rubyjit.LogStash::Util::WrappedSynchronousQueue::ReadBatch$$each_8ee46fd8c62d1155253b7a084f2e67ff25953d7e1956725890.file(/usr/local/Cellar/logstash/5.1.1/libexec/logstash-core/lib/logstash/util/wrapped_synchronous_queue.rb:191)", "rubyjit.LogStash::Util::WrappedSynchronousQueue::ReadBatch$$each_8ee46fd8c62d1155253b7a084f2e67ff25953d7e1956725890.file(/usr/local/Cellar/logstash/5.1.1/libexec/logstash-core/lib/logstash/util/wrapped_synchronous_queue.rb)", "org.jruby.internal.runtime.methods.JittedMethod.call(JittedMethod.java:161)", "org.jruby.runtime.callsite.CachingCallSite.callBlock(CachingCallSite.java:143)", "org.jruby.runtime.callsite.CachingCallSite.callIter(CachingCallSite.java:154)", "rubyjit.LogStash::Pipeline$$filter_batch_390eee8140cc612b1771fb72e9904822f536cabf1956725890.chained_0_rescue_1$RUBY$SYNTHETIC__file__(/usr/local/Cellar/logstash/5.1.1/libexec/logstash-core/lib/logstash/pipeline.rb:294)", "rubyjit.LogStash::Pipeline$$filter_batch_390eee8140cc612b1771fb72e9904822f536cabf1956725890.file(/usr/local/Cellar/logstash/5.1.1/libexec/logstash-core/lib/logstash/pipeline.rb)", "rubyjit.LogStash::Pipeline$$filter_batch_390eee8140cc612b1771fb72e9904822f536cabf1956725890.file(/usr/local/Cellar/logstash/5.1.1/libexec/logstash-core/lib/logstash/pipeline.rb)", "org.jruby.internal.runtime.methods.JittedMethod.call(JittedMethod.java:181)", "org.jruby.runtime.callsite.CachingCallSite.call(CachingCallSite.java:168)", "org.jruby.ast.FCallOneArgNode.interpret(FCallOneArgNode.java:36)", "org.jruby.ast.NewlineNode.interpret(NewlineNode.java:105)", "org.jruby.ast.BlockNode.interpret(BlockNode.java:71)", "org.jruby.ast.WhileNode.interpret(WhileNode.java:131)", "org.jruby.ast.NewlineNode.interpret(NewlineNode.java:105)", "org.jruby.ast.BlockNode.interpret(BlockNode.java:71)", "org.jruby.evaluator.ASTInterpreter.INTERPRET_METHOD(ASTInterpreter.java:74)", "org.jruby.internal.runtime.methods.InterpretedMethod.call(InterpretedMethod.java:225)", "org.jruby.internal.runtime.methods.DefaultMethod.call(DefaultMethod.java:219)", "org.jruby.runtime.callsite.CachingCallSite.call(CachingCallSite.java:202)", "org.jruby.ast.FCallTwoArgNode.interpret(FCallTwoArgNode.java:38)", "org.jruby.ast.NewlineNode.interpret(NewlineNode.java:105)", "org.jruby.ast.BlockNode.interpret(BlockNode.java:71)", "org.jruby.evaluator.ASTInterpreter.INTERPRET_BLOCK(ASTInterpreter.java:112)", "org.jruby.runtime.Interpreted19Block.evalBlockBody(Interpreted19Block.java:206)", "org.jruby.runtime.Interpreted19Block.yield(Interpreted19Block.java:194)", "org.jruby.runtime.Interpreted19Block.call(Interpreted19Block.java:125)", "org.jruby.runtime.Block.call(Block.java:101)", "org.jruby.RubyProc.call(RubyProc.java:300)", "org.jruby.RubyProc.call(RubyProc.java:230)", "org.jruby.internal.runtime.RubyRunnable.run(RubyRunnable.java:99)", "java.lang.Thread.run(Thread.java:745)"]}

i think thats because of my filter configuration before.
i try to put this filter :

filter {
if [tags][json] {
json {
source => "message"
}
}
}

then the error comes.

so, what should i do sir?
i search all over the web, but still have no solutions.
can you pls help me.

thank you.


(Magnus Bäck) #4
if [tags][json] {

I don't know what this is supposed to mean. For now drop the conditional and focus on getting things working.


(Guta Saputra) #5

its my old configuration sir.

my new one is this :
filter {
if [message] =~ /^{.*}$/
{
json { source => message }
}

}


(Magnus Bäck) #6

I repeat: Drop the conditional and focus on getting things working.


(system) #7

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.