How extract specific fields from JSON input with Logstash filters?


(Mike Dobrin) #1

I am currently trying to parse a JSON string using Logstash for sending to an Elastic Search DB. The input string looks like the following:

"message": """{"resourceId":"201987-20190320-05307201","body":{"orderNumber":"201987-20190320-05307201","wrapping1":{"name":null,"price":null,"taxFlag":null},"wrapping2":{"name":null,"price":null,"taxFlag":null},"package":{"608738":{"goodsTax":222,"postagePrice":0,"deliveryPrice":0,"sender":{"isOrderer":true,"familyName":"Smith",action":"CREATE","timestampMs":1553044764100}"""

I am trying a few different methods for extracting the "resourceId" field while removing all others, but have not been successful. I receive an error that reads:

Could not index event to Elasticsearch.
...
Limit of total fields [5000] in index [test2-rt_platform] has been exceeded

My settings are as follows:

input {
  kafka {
    # Target servers
    bootstrap_servers => "myMachine:9002"

    # Topic and consumer settings
    topics => ["myTopic"]
    group_id => "theConsumer"
    consumer_threads => 1

    decorate_events => true

    # Output format settings
    codec => json

    # Performance settings
    auto_commit_interval_ms => "10000"
    auto_offset_reset => "latest"
    request_timeout_ms => "7000"
    session_timeout_ms => "6000"
    heartbeat_interval_ms => "2000"
    poll_timeout_ms => 2000
    retry_backoff_ms => "1000"
    max_partition_fetch_bytes => "10485760"

  }
}

filter {
  mutate {
    add_field => {
      "[@metadata][index]" => "platform_test_%{[kafka][topic]}"
      "[@metadata][format]" => "%{[kafka][topic]}"
    }
  }
  json {
      source => "message"
  }

  mutate {
        add_field => {
	     "orderNumber22" => "%{[body][orderNumber]}"
	}

        add_field => {
            "orderNumber33" => [ "[body][orderNumber]" ]
        }
        add_field => {
           "orderNumber44" => "%{[message][body][orderNumber]}"
        }

        add_field => {
            "orderNumber55" => [ "[message][body][orderNumber]" ]
        }
        remove_field => [ "[message]" ]
}


  fingerprint {
    method => "MD5"
    key => "%{[kafka][topic]}"
    target => "[@metadata][fingerprint]"
  }
}

output {
  elasticsearch {
    action => "index"
    flush_size => 100
    document_id => "%{[@metadata][fingerprint]}"
    document_type => "%{[@metadata][format]}"
    hosts => ["myMachine:9002"]
    index => "test2-rt_platform"
    retry_max_interval => 5
    timeout => 10000
    user => "myUser"
    password => "myPassword"
  }
  stdout {
    codec => rubydebug { metadata => true }
  }

}

(system) closed #2

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.