Read JSON with Logstash


#1

Hi people!

I want parser a json file and put output in a Elastic every minute, but I don't want add the same json each time.
I would like if json have 10 element, have only this 10 elements in my index of Elastic but updated each minute, I mean update element if exist and only add if no exist.
I was investigated about upsert but I don't get it.

Versions I'm using: Logstash 2.4 and Elastic Search 2.2

This is my input json input
http://pastebin.com/z6KHhgYi

And this is my logstash config
input {
http_poller {
urls => {
k8s_namespaces => "http://172.23.0.3:8080/api/v1/namespaces"
}
# Maximum amount of time to wait for a request to complete all
request_timeout => 30
# How far apart requests should be
interval => 60
# Decode the results as JSON
codec => "json"
# Store metadata about the request in this key
metadata_target => "k8s"
}
}

filter {
  split {
  field => "items"
  }
}

output {
  elasticsearch{
    action => "update"
    codec => "json"
    hosts => "172.16.230.48"
    document_id => "%{items.metadata.name}"
    index => "test"
    upsert => ' {"name" : "%{items.metadata.name}" }'
    doc_as_upsert => true
  }

And this is the error:
Attempted to send a bulk request to Elasticsearch configured at '["http://172.25.230.48:9200"]', but an error occurred and it failed! Are you sure you can reach elasticsearch from this machine using the configuration provided? {:error_message=>"[400] {"error":{"root_cause":[{"type":"illegal_argument_exception","reason":"Malformed action/metadata line [1], expected a simple value for field [_upsert] but found [START_OBJECT]"}],"type":"illegal_argument_exception","reason":"Malformed action/metadata line [1], expected a simple value for field [_upsert] but found [START_OBJECT]"},"status":400}", :error_class=>"Elasticsearch::Transport::Transport::Errors::BadRequest", :backtrace=>["/opt/logstash/vendor/bundle/jruby/1.9/gems/elasticsearch-transport-1.1.0/lib/elasticsearch/transport/transport/base.rb:201:in __raise_transport_error'", "/opt/logstash/vendor/bundle/jruby/1.9/gems/elasticsearch-transport-1.1.0/lib/elasticsearch/transport/transport/base.rb:312:inperform_request'", "/opt/logstash/vendor/bundle/jruby/1.9/gems/elasticsearch-transport-1.1.0/lib/elasticsearch/transport/transport/http/manticore.rb:67:in perform_request'", "/opt/logstash/vendor/bundle/jruby/1.9/gems/elasticsearch-transport-1.1.0/lib/elasticsearch/transport/client.rb:128:inperform_request'", "/opt/logstash/vendor/bundle/jruby/1.9/gems/elasticsearch-api-1.1.0/lib/elasticsearch/api/actions/bulk.rb:93:in bulk'", "/opt/logstash/vendor/bundle/jruby/1.9/gems/logstash-output-elasticsearch-2.7.1-java/lib/logstash/outputs/elasticsearch/http_client.rb:53:innon_threadsafe_bulk'", "/opt/logstash/vendor/bundle/jruby/1.9/gems/logstash-output-elasticsearch-2.7.1-java/lib/logstash/outputs/elasticsearch/http_client.rb:38:in bulk'", "org/jruby/ext/thread/Mutex.java:149:insynchronize'", "/opt/logstash/vendor/bundle/jruby/1.9/gems/logstash-output-elasticsearch-2.7.1-java/lib/logstash/outputs/elasticsearch/http_client.rb:38:in bulk'", "/opt/logstash/vendor/bundle/jruby/1.9/gems/logstash-output-elasticsearch-2.7.1-java/lib/logstash/outputs/elasticsearch/common.rb:172:insafe_bulk'", "/opt/logstash/vendor/bundle/jruby/1.9/gems/logstash-output-elasticsearch-2.7.1-java/lib/logstash/outputs/elasticsearch/common.rb:101:in submit'", "/opt/logstash/vendor/bundle/jruby/1.9/gems/logstash-output-elasticsearch-2.7.1-java/lib/logstash/outputs/elasticsearch/common.rb:86:inretrying_submit'", "/opt/logstash/vendor/bundle/jruby/1.9/gems/logstash-output-elasticsearch-2.7.1-java/lib/logstash/outputs/elasticsearch/common.rb:29:in multi_receive'", "org/jruby/RubyArray.java:1653:ineach_slice'", "/opt/logstash/vendor/bundle/jruby/1.9/gems/logstash-output-elasticsearch-2.7.1-java/lib/logstash/outputs/elasticsearch/common.rb:28:in multi_receive'", "/opt/logstash/vendor/bundle/jruby/1.9/gems/logstash-core-2.4.0-java/lib/logstash/output_delegator.rb:130:inworker_multi_receive'", "/opt/logstash/vendor/bundle/jruby/1.9/gems/logstash-core-2.4.0-java/lib/logstash/output_delegator.rb:114:in multi_receive'", "/opt/logstash/vendor/bundle/jruby/1.9/gems/logstash-core-2.4.0-java/lib/logstash/pipeline.rb:301:inoutput_batch'", "org/jruby/RubyHash.java:1342:in each'", "/opt/logstash/vendor/bundle/jruby/1.9/gems/logstash-core-2.4.0-java/lib/logstash/pipeline.rb:301:inoutput_batch'", "/opt/logstash/vendor/bundle/jruby/1.9/gems/logstash-core-2.4.0-java/lib/logstash/pipeline.rb:232:in worker_loop'", "/opt/logstash/vendor/bundle/jruby/1.9/gems/logstash-core-2.4.0-java/lib/logstash/pipeline.rb:201:instart_workers'"], :level=>:error}

Can you help me?

Thank you


(Magnus B├Ąck) #2

It might not make a difference to this particular problem, but you need to change all occurrences of %{items.metadata.name} to %{[items][metadata][name]}.


#3

Thanks, I tried but still not working...

  elasticsearch{
    action => "update"
    codec => "json"
    hosts => "172.25.230.48"
    document_id => "%{[items][metadata][name]}"
    index => "test"
    upsert => ' {
    "name" : "%{[items][metadata][name]}"
    }'
    doc_as_upsert => true
  }

With the same result:
ttempted to send a bulk request to Elasticsearch configured at '["http://172.25.230.48:9200"]', but an error occurred and it failed! Are you sure you can reach elasticsearch from this machine using the configuration provided? {:error_message=>"[400] {"error":{"root_cause":[{"type":"illegal_argument_exception","reason":"Malformed action/metadata line [1], expected a simple value for field [_upsert] but found [START_OBJECT]"}],"type":"illegal_argument_exception","reason":"Malformed action/metadata line [1], expected a simple value for field [_upsert] but found [START_OBJECT]"},"status":400}", :error_class=>"Elasticsearch::Transport::Transport::Errors::BadRequest", :backtrace=>["/opt/logstash/vendor/bundle/jruby/1.9/gems/elasticsearch-transport-1.1.0/lib/elasticsearch/transport/transport/base.rb:201:in __raise_transport_error'", "/opt/logstash/vendor/bundle/jruby/1.9/gems/elasticsearch-transport-1.1.0/lib/elasticsearch/transport/transport/base.rb:312:inperform_request'", "/opt/logstash/vendor/bundle/jruby/1.9/gems/elasticsearch-transport-1.1.0/lib/elasticsearch/transport/transport/http/manticore.rb:67:in perform_request'", "/opt/logstash/vendor/bundle/jruby/1.9/gems/elasticsearch-transport-1.1.0/lib/elasticsearch/transport/client.rb:128:inperform_request'", "/opt/logstash/vendor/bundle/jruby/1.9/gems/elasticsearch-api-1.1.0/lib/elasticsearch/api/actions/bulk.rb:93:in bulk'", "/opt/logstash/vendor/bundle/jruby/1.9/gems/logstash-output-elasticsearch-2.7.1-java/lib/logstash/outputs/elasticsearch/http_client.rb:53:innon_threadsafe_bulk'", "/opt/logstash/vendor/bundle/jruby/1.9/gems/logstash-output-elasticsearch-2.7.1-java/lib/logstash/outputs/elasticsearch/http_client.rb:38:in bulk'", "org/jruby/ext/thread/Mutex.java:149:insynchronize'", "/opt/logstash/vendor/bundle/jruby/1.9/gems/logstash-output-elasticsearch-2.7.1-java/lib/logstash/outputs/elasticsearch/http_client.rb:38:in bulk'", "/opt/logstash/vendor/bundle/jruby/1.9/gems/logstash-output-elasticsearch-2.7.1-java/lib/logstash/outputs/elasticsearch/common.rb:172:insafe_bulk'", "/opt/logstash/vendor/bundle/jruby/1.9/gems/logstash-output-elasticsearch-2.7.1-java/lib/logstash/outputs/elasticsearch/common.rb:101:in submit'", "/opt/logstash/vendor/bundle/jruby/1.9/gems/logstash-output-elasticsearch-2.7.1-java/lib/logstash/outputs/elasticsearch/common.rb:86:inretrying_submit'", "/opt/logstash/vendor/bundle/jruby/1.9/gems/logstash-output-elasticsearch-2.7.1-java/lib/logstash/outputs/elasticsearch/common.rb:29:in multi_receive'", "org/jruby/RubyArray.java:1653:ineach_slice'", "/opt/logstash/vendor/bundle/jruby/1.9/gems/logstash-output-elasticsearch-2.7.1-java/lib/logstash/outputs/elasticsearch/common.rb:28:in multi_receive'", "/opt/logstash/vendor/bundle/jruby/1.9/gems/logstash-core-2.4.0-java/lib/logstash/output_delegator.rb:130:inworker_multi_receive'", "/opt/logstash/vendor/bundle/jruby/1.9/gems/logstash-core-2.4.0-java/lib/logstash/output_delegator.rb:114:in multi_receive'", "/opt/logstash/vendor/bundle/jruby/1.9/gems/logstash-core-2.4.0-java/lib/logstash/pipeline.rb:301:inoutput_batch'", "org/jruby/RubyHash.java:1342:in each'", "/opt/logstash/vendor/bundle/jruby/1.9/gems/logstash-core-2.4.0-java/lib/logstash/pipeline.rb:301:inoutput_batch'", "/opt/logstash/vendor/bundle/jruby/1.9/gems/logstash-core-2.4.0-java/lib/logstash/pipeline.rb:232:in worker_loop'", "/opt/logstash/vendor/bundle/jruby/1.9/gems/logstash-core-2.4.0-java/lib/logstash/pipeline.rb:201:instart_workers'"], :level=>:error}

Any other suggestion?

Thank you


(Christian Dahlqvist) #4

Can you try removing the JSON codec? The Elasticsearch output plugin converts to JSON automatically, so I do not believe this codec should be present.


#5

Thanks, I remove the line, but must be something else, because still show the same error.

  elasticsearch{
    action => "update"
#    codec => "json"
    hosts => "172.25.230.48"
    document_id => "%{[items][metadata][name]}"
    index => "test"
    upsert => ' {
    "name" : "%{[items][metadata][name]}"
    }'
    doc_as_upsert => true
  }

Any suggestion?

Thank you very much


#6

I get it, I have to change doc_as_upsert to false.

Now I have a problem, only update the "news changes" ok, but if not exist the item in my json anymore I continue viewing in my elastic index.

How can save in elastic the last update/version of my JSON file?

The only way to do this is to delete the entire index before each update? Its quite radical no? I'm very stuck with this!

Any suggestions?

Thank you


(system) #7

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.