How to edit data coming in from kafka within logstash and ingest into elasticsearch

i have two indexes coming in elasticsearch that are being fed through kafka, (indexA) and (indexB), i want to take some fields from the (indexB) and a few from (indexA) and make a third (indexC) of of those fields.

i have tried one way of using the elasticsearch input plugin to get the already posted index data out and process it in the config file to post it back into elasticsearch, but apart from makinng copies of the same indexes i could not achive anything.

This is the config file i tried with one index

input {
      elasticsearch {
        hosts => ["localhost:9200"]
        index => "analytics*"
        query => '{ "query": { "query_string": { "query": "*" } } }'
        size => 500
        scroll => "5m"
        docinfo => true
      }
    }
filter {
 mutate {
  remove_field => [ "user.id"]
 }
}
output {
      elasticsearch {
        index => "copy.%{[@metadata][_index]}"
        document_type => "%{[@metadata][_type]}"
        document_id => "%{[@metadata][_id]}"
      }
    }

my goal is to take the (username) field, (topic) field, and the (data) field from (indexA), the distinct (username) field using the cardinality aggregation from (indexB) and make a new (indexC) from this data.

the Cardinality aggregation used

{
  		"aggs": {
    		"user_count": {
      			"cardinality": {
        			"field": "data.user.name.keyword"
      			}
    		}
  		}
  	}

any suggestions on how i can solve this issue?

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.