Logstash Filter for JSON data

hello,

i have the same issue with this post.

the difference is he use filebeat to send data to logstash, then the logstash will send data to elasticsearch.

i use logstash to load my log file (written in json), then i export it to elasticsearch.
but the problem is my json not separated to different object in elasticsearch.
its like they merged into one object, named message.

"message": "{ \n "user_id":0,\n "cart_id":"222",\n "error_status":"",\n "error_message":"",\n "data":{ \n "ord_id":2233,\n "cart":{ \n "cart_id":"222",\n "session_id":"7afaaf7fac9bb934de4c6ecb2567461a6178495e",\n "ip_address":"180.254.65.91",\n "usr_id":"0",\n "created_date":"2017-01-19 17:29:20",\n "updated_date":"2017-01-19 18:08:35"\n },\n "cart_data":{ \n "coupon_code":"",\n "ord_email":"gutasaputra@gmail.com",\n "ord_firstname":"guta"}"

what i want is like this :

"message": "{
user_id:0,
cart_id:3486,
ord_email:gutasaputra@gmail.com
}"

how can i do that via logstash?

ohy, here is my logstash conf :

input
{
    file
    {
        codec => multiline
        {
            pattern => '^\{'
            negate => true
            what => previous
        }

        path => ["/usr/local/Cellar/logstash/5.1.1/test_payment.json"]
        start_position => "beginning"
        sincedb_path => "/dev/null"
    }
}

filter {
    if [message] =~ /^{.*}$/
    {
        json { source => message }
    }

}

output {
  elasticsearch {
    hosts => ["localhost:9200"]
    user => elastic
    password => techno2013
    index => "test_payment"
 }
  stdout { codec => rubydebug }
}

thank you

The correct solution is to use a json filter just like you're doing now. If it doesn't seem to be working, check that there's nothing wrong with your conditional (i.e. try commenting it) and that the JSON parsing isn't failing (check the Logstash log).

here is my logstash log file.

[2017-01-24T13:55:11,592][INFO ][logstash.outputs.elasticsearch] New Elasticsearch output {:class=>"LogStash::Outputs::Elasticsearch", :hosts=>["localhost:9200"]}
[2017-01-24T13:55:11,596][INFO ][logstash.pipeline ] Starting pipeline {"id"=>"main", "pipeline.workers"=>4, "pipeline.batch.size"=>125, "pipeline.batch.delay"=>5, "pipeline.max_inflight"=>500}
[2017-01-24T13:55:11,637][INFO ][logstash.pipeline ] Pipeline main started
[2017-01-24T13:55:11,700][INFO ][logstash.agent ] Successfully started Logstash API endpoint {:port=>9600}

the only error msg that i had in my log file is this :

[2017-01-24T13:53:02,894][INFO ][logstash.agent ] Successfully started Logstash API endpoint {:port=>9600}
"org.jruby.runtime.CompiledBlock19.yield(CompiledBlock19.java:159)", "org.jruby.runtime.CompiledBlock19.call(CompiledBlock19.java:87)", "org.jruby.runtime.Block.call(Block.java:101)", "org.jruby.RubyProc.call(RubyProc.java:300)", "org.jruby.RubyProc.call19(RubyProc.java:281)", "org.jruby.RubyProc$INVOKER$i$0$0$call19.call(RubyProc$INVOKER$i$0$0$call19.gen)", "org.jruby.internal.runtime.methods.DynamicMethod.call(DynamicMethod.java:210)", "org.jruby.internal.runtime.methods.DynamicMethod.call(DynamicMethod.java:206)", "org.jruby.runtime.callsite.CachingCallSite.call(CachingCallSite.java:168)", "rubyjit.LogStash::Util::WrappedSynchronousQueue::ReadBatch$$each_8ee46fd8c62d1155253b7a084f2e67ff25953d7e1956725890.block_0$RUBY$file(/usr/local/Cellar/logstash/5.1.1/libexec/logstash-core/lib/logstash/util/wrapped_synchronous_queue.rb:192)", "rubyjit$LogStash::Util::WrappedSynchronousQueue::ReadBatch$$each_8ee46fd8c62d1155253b7a084f2e67ff25953d7e1956725890$block_0$RUBY$file.call(rubyjit$LogStash::Util::WrappedSynchronousQueue::ReadBatch$$each_8ee46fd8c62d1155253b7a084f2e67ff25953d7e1956725890$block_0$RUBY$file)", "org.jruby.runtime.CompiledBlock19.yield(CompiledBlock19.java:135)", "org.jruby.runtime.Block.yield(Block.java:142)", "org.jruby.RubyHash$13.visit(RubyHash.java:1355)", "org.jruby.RubyHash.visitLimited(RubyHash.java:648)", "org.jruby.RubyHash.visitAll(RubyHash.java:634)", "org.jruby.RubyHash.iteratorVisitAll(RubyHash.java:1306)", "org.jruby.RubyHash.each_pairCommon(RubyHash.java:1351)", "org.jruby.RubyHash.each19(RubyHash.java:1342)", "org.jruby.RubyHash$INVOKER$i$0$0$each19.call(RubyHash$INVOKER$i$0$0$each19.gen)", "org.jruby.runtime.callsite.CachingCallSite.callBlock(CachingCallSite.java:143)", "org.jruby.runtime.callsite.CachingCallSite.callIter(CachingCallSite.java:154)", "rubyjit.LogStash::Util::WrappedSynchronousQueue::ReadBatch$$each_8ee46fd8c62d1155253b7a084f2e67ff25953d7e1956725890.file(/usr/local/Cellar/logstash/5.1.1/libexec/logstash-core/lib/logstash/util/wrapped_synchronous_queue.rb:191)", "rubyjit.LogStash::Util::WrappedSynchronousQueue::ReadBatch$$each_8ee46fd8c62d1155253b7a084f2e67ff25953d7e1956725890.file(/usr/local/Cellar/logstash/5.1.1/libexec/logstash-core/lib/logstash/util/wrapped_synchronous_queue.rb)", "org.jruby.internal.runtime.methods.JittedMethod.call(JittedMethod.java:161)", "org.jruby.runtime.callsite.CachingCallSite.callBlock(CachingCallSite.java:143)", "org.jruby.runtime.callsite.CachingCallSite.callIter(CachingCallSite.java:154)", "rubyjit.LogStash::Pipeline$$filter_batch_390eee8140cc612b1771fb72e9904822f536cabf1956725890.chained_0_rescue_1$RUBY$SYNTHETIC__file__(/usr/local/Cellar/logstash/5.1.1/libexec/logstash-core/lib/logstash/pipeline.rb:294)", "rubyjit.LogStash::Pipeline$$filter_batch_390eee8140cc612b1771fb72e9904822f536cabf1956725890.file(/usr/local/Cellar/logstash/5.1.1/libexec/logstash-core/lib/logstash/pipeline.rb)", "rubyjit.LogStash::Pipeline$$filter_batch_390eee8140cc612b1771fb72e9904822f536cabf1956725890.file(/usr/local/Cellar/logstash/5.1.1/libexec/logstash-core/lib/logstash/pipeline.rb)", "org.jruby.internal.runtime.methods.JittedMethod.call(JittedMethod.java:181)", "org.jruby.runtime.callsite.CachingCallSite.call(CachingCallSite.java:168)", "org.jruby.ast.FCallOneArgNode.interpret(FCallOneArgNode.java:36)", "org.jruby.ast.NewlineNode.interpret(NewlineNode.java:105)", "org.jruby.ast.BlockNode.interpret(BlockNode.java:71)", "org.jruby.ast.WhileNode.interpret(WhileNode.java:131)", "org.jruby.ast.NewlineNode.interpret(NewlineNode.java:105)", "org.jruby.ast.BlockNode.interpret(BlockNode.java:71)", "org.jruby.evaluator.ASTInterpreter.INTERPRET_METHOD(ASTInterpreter.java:74)", "org.jruby.internal.runtime.methods.InterpretedMethod.call(InterpretedMethod.java:225)", "org.jruby.internal.runtime.methods.DefaultMethod.call(DefaultMethod.java:219)", "org.jruby.runtime.callsite.CachingCallSite.call(CachingCallSite.java:202)", "org.jruby.ast.FCallTwoArgNode.interpret(FCallTwoArgNode.java:38)", "org.jruby.ast.NewlineNode.interpret(NewlineNode.java:105)", "org.jruby.ast.BlockNode.interpret(BlockNode.java:71)", "org.jruby.evaluator.ASTInterpreter.INTERPRET_BLOCK(ASTInterpreter.java:112)", "org.jruby.runtime.Interpreted19Block.evalBlockBody(Interpreted19Block.java:206)", "org.jruby.runtime.Interpreted19Block.yield(Interpreted19Block.java:194)", "org.jruby.runtime.Interpreted19Block.call(Interpreted19Block.java:125)", "org.jruby.runtime.Block.call(Block.java:101)", "org.jruby.RubyProc.call(RubyProc.java:300)", "org.jruby.RubyProc.call(RubyProc.java:230)", "org.jruby.internal.runtime.RubyRunnable.run(RubyRunnable.java:99)", "java.lang.Thread.run(Thread.java:745)"]}

i think thats because of my filter configuration before.
i try to put this filter :

filter {
if [tags][json] {
json {
source => "message"
}
}
}

then the error comes.

so, what should i do sir?
i search all over the web, but still have no solutions.
can you pls help me.

thank you.

if [tags][json] {

I don't know what this is supposed to mean. For now drop the conditional and focus on getting things working.

its my old configuration sir.

my new one is this :
filter {
if [message] =~ /^{.*}$/
{
json { source => message }
}

}

I repeat: Drop the conditional and focus on getting things working.

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.