Cannot combine two indexes to one using logstash

I have the following two indexes:
PUT twitter1/doc/1
{
"name" : "Mike",
"c_num" : 1,
"tags" : "in1"
}

PUT twitter2/doc/1
{
"resume" : "Mike blah, blah, blah",
"c_num" : 1,
"tags" : "in2"
}

I would like to have a final index combining two. I like to have both "name" and "resume" for each c_num. Like below:

twitter4
{
"resume" : "Mike blah, blah, blah",
"name" : "Mike",
"c_num" : 1,
}

I used following logstash but for some reason filtering doesn't join them together:
input {
elasticsearch {
hosts => "http://11.222.33.44:9200"
user => "user"
password => "pwd"
index => "twitter1"
}
elasticsearch {
hosts => "http://11.222.33.44:9200"
user => "user"
password => "pwd"
index => "twitter2"
}
}

filter {
aggregate {
task_id => "%{c_num}"

code => "
    if (event.get('tags').include('in1')) 
      map['name'] = event.get('name')    
    else      
      map['resume'] = event.get('resume')
    end
    event.cancel();
"
inactivity_timeout => 300  #seconds since last event
push_map_as_event_on_timeout => true
timeout_task_id_field => "c_num"

}
}
output {
elasticsearch {
hosts => "http://11.222.33.44:9200"
ssl_certificate_verification => "false"
index => "twitter4"
document_id => "%{c_num}"
}
}
--- I am getting the following error:
"Aggregate exception occurred {:error=>#<NoMethodError: undefined method `include' for "in1":String>,"
and the result is not what I want.
I am open for any logstash alternative solution.

You have not made tags an array, it is just a string, so you should do a string equality test, not a .include test

Thanks Badger for your comment. I changed filter as below:
filter {
aggregate {
task_id => "%{c_num}"

code => "
    if (event.get('tags') != 'in1') 
      map['resume'] = event.get('resume')
    else      
      map['name'] = event.get('name')    
    end
    event.cancel();
"
inactivity_timeout => 300  #seconds since last event
push_map_as_event_on_timeout => true
timeout_task_id_field => "c_num"

}
But I get one record from Twitter1 and one from twitter 2 and they are not combined.
"hits" : [
{
"_index" : "twitter4",
"_type" : "doc",
"_id" : "2",
"_score" : 1.0,
"_source" : {
"@version" : "1",
"@timestamp" : "2019-05-21T19:54:23.850Z",
"resume" : "David blah, blah, blah",
"c_num" : 2,
"tags" : [
"in2",
"_aggregateexception"
]
}
},
{
"_index" : "twitter4",
"_type" : "doc",
"_id" : "1",
"_score" : 1.0,
"_source" : {
"@version" : "1",
"@timestamp" : "2019-05-21T19:54:23.864Z",
"name" : "Mike",
"c_num" : 1,
"tags" : [
"in1",
"_aggregateexception"
]
}

They have different c_num values, so I would not expect them to be aggregated. Does the aggregate filter log an exception?

Aggregate filter doesn't log an exception.
In my example in Twitter1 and Twitter2 have the same c_num value. The whole idea is to combine two indexes to make one index and have columns from both indexes.
One index has a column "name" and other index has column "resume" for the same c_num.
I would like final index twitter4 to have both "name" and "resume" column for the same c_num.
Below should be the final result:
twitter4
{
"resume" : "Mike blah, blah, blah",
"name" : "Mike",
"c_num" : 1,
}
So I am looking for the correct filter to do this.

It is hard to believe that no exception is logged when the events output include an _aggregateexception tag.

First thank you for following up.
I am not expert in logstash and may be I didn't understand exactly what you are asking for. But I meant when I run the logstash, I don't get error in aggregation. As a matter of fact it says."Aggregate successful". Do I need to look somewhere else to see the exception?
Here is the result displays on the screen.
[logstash.agent] Successfully started Logstash API endpoint {:port=>1234}
[logstash.instrument.periodicpoller.jvm] collector name {:name=>"ParNew"}
[logstash.instrument.periodicpoller.jvm] collector name {:name=>"ConcurrentMarkSweep"}
[logstash.pipeline] filter received {"event"=>{"@version"=>"1", "@timestamp"=>2019-05-23T12:28:32.970Z, "tags"=>"in2", "control_num"=>2, "resume"=>"David blah, blah, blah"}}
[logstash.pipeline] filter received {"event"=>{"@timestamp"=>2019-05-23T12:28:32.985Z, "name"=>"Mike", "tags"=>"in1", "control_num"=>1, "@version"=>"1"}}
[logstash.pipeline] filter received {"event"=>{"@timestamp"=>2019-05-23T12:28:32.970Z, "name"=>"David", "tags"=>"in1", "control_num"=>2, "@version"=>"1"}}
[logstash.pipeline] filter received {"event"=>{"@version"=>"1", "@timestamp"=>2019-05-23T12:28:32.981Z, "tags"=>"in2", "control_num"=>1, "resume"=>"Mike blah, blah, blah"}}
[logstash.inputs.elasticsearch] Closing {:plugin=>"LogStash::Inputs::Elasticsearch"}
[logstash.filters.aggregate] Aggregate successful filter code execution {:code=>"\r\n map['tags'] = event.get('tags')\r\n if (event.get('tags') != 'in1')\r\n map['resume'] = event.get('resume')\r\n else\r\n map['name'] = event.get('name')\r\n end\r\n# event.cancel()\r\n "}
[logstash.filters.aggregate] Aggregate successful filter code execution {:code=>"\r\n map['tags'] = event.get('tags')\r\n if (event.get('tags') != 'in1')\r\n map['resume'] = event.get('resume')\r\n else\r\n map['name'] = event.get('name')\r\n end\r\n# event.cancel()\r\n "}
[logstash.filters.aggregate] Aggregate successful filter code execution {:code=>"\r\n map['tags'] = event.get('tags')\r\n if (event.get('tags') != 'in1')\r\n map['resume'] = event.get('resume')\r\n else\r\n map['name'] = event.get('name')\r\n end\r\n# event.cancel()\r\n "}
[logstash.filters.aggregate] Aggregate successful filter code execution {:code=>"\r\n map['tags'] = event.get('tags')\r\n if (event.get('tags') != 'in1')\r\n map['resume'] = event.get('resume')\r\n else\r\n map['name'] = event.get('name')\r\n end\r\n# event.cancel()\r\n "}
[logstash.pipeline] output received {"event"=>{"@timestamp"=>2019-05-23T12:28:32.970Z, "name"=>"David", "tags"=>["in1"], "control_num"=>2, "@version"=>"1"}}
[logstash.pipeline] output received {"event"=>{"@timestamp"=>2019-05-23T12:28:32.985Z, "name"=>"Mike", "tags"=>["in1"], "control_num"=>1, "@version"=>"1"}}
[logstash.pipeline] output received {"event"=>{"@version"=>"1", "@timestamp"=>2019-05-23T12:28:32.970Z, "tags"=>["in2"], "control_num"=>2, "resume"=>"David blah, blah, blah"}}
[logstash.pipeline] output received {"event"=>{"@version"=>"1", "@timestamp"=>2019-05-23T12:28:32.981Z, "tags"=>["in2"], "control_num"=>1, "resume"=>"Mike blah, blah, blah"}}
[logstash.pipeline] Pushing flush onto pipeline {:pipeline_id=>"main", :thread=>"#<Thread:0x18bd9b72 sleep>"}
[logstash.pipeline] Shutting down filter/output workers {:pipeline_id=>"main", :thread=>"#<Thread:0x18bd9b72 run>"}
[logstash.pipeline] Setting shutdown {:pipeline_id=>"main", :thread=>"#<Thread:0x18bd9b72 run>"}
[logstash.pipeline] Shutdown waiting for worker thread {:pipeline_id=>"main", :thread=>"#<Thread:0x3a1f5a9b run>"}
[logstash.pipeline] Flushing {:plugin=>#LogStash::FilterDelegator:0x54555fb2}
[logstash.filters.aggregate] Aggregate timeout for '%{control_num}' pattern: 1800 seconds
[logstash.pipeline] Flushing {:plugin=>#LogStash::FilterDelegator:0x54555fb2}
[logstash.filters.aggregate] Aggregate remove_expired_maps call with '%{control_num}' pattern and 2 maps
[logstash.filters.aggregate] Aggregate remove_expired_maps call with '%{control_num}' pattern and 2 maps
[logstash.filters.aggregate] Aggregate remove_expired_maps call with '%{control_num}' pattern and 2 maps
[logstash.filters.aggregate] Aggregate remove_expired_maps call with '%{control_num}' pattern and 2 maps
[logstash.pipeline] Flushing {:plugin=>#LogStash::FilterDelegator:0x54555fb2}
[logstash.filters.aggregate] Aggregate remove_expired_maps call with '%{control_num}' pattern and 2 maps
[logstash.filters.aggregate] Aggregate remove_expired_maps call with '%{control_num}' pattern and 2 maps
[logstash.filters.aggregate] Aggregate remove_expired_maps call with '%{control_num}' pattern and 2 maps
[logstash.filters.aggregate] Aggregate remove_expired_maps call with '%{control_num}' pattern and 2 maps
[logstash.pipeline] Shutdown waiting for worker thread {:pipeline_id=>"main", :thread=>"#<Thread:0x57e3be9 run>"}
[logstash.pipeline] Shutdown waiting for worker thread {:pipeline_id=>"main", :thread=>"#<Thread:0x1be9f9f5 dead>"}
[logstash.pipeline] Shutdown waiting for worker thread {:pipeline_id=>"main", :thread=>"#<Thread:0x4e5503 dead>"}
[logstash.pipeline] Shutdown waiting for worker thread {:pipeline_id=>"main", :thread=>"#<Thread:0x1acc2e42 dead>"}
[logstash.pipeline] Shutdown waiting for worker thread {:pipeline_id=>"main", :thread=>"#<Thread:0x40a31a40 dead>"}
[logstash.pipeline] Shutdown waiting for worker thread {:pipeline_id=>"main", :thread=>"#<Thread:0x7e56a45 dead>"}
[logstash.pipeline] Shutdown waiting for worker thread {:pipeline_id=>"main", :thread=>"#<Thread:0x50ce6a40 dead>"}
[logstash.filters.aggregate] Closing {:plugin=>"LogStash::Filters::Aggregate"}
[logstash.filters.aggregate] Aggregate close call {:code=>"\r\n map['tags'] = event.get('tags')\r\n if (event.get('tags') != 'in1')\r\n map['resume'] = event.get('resume')\r\n else\r\n map['name'] = event.get('name')\r\n end\r\n# event.cancel()\r\n "}
[logstash.outputs.elasticsearch] Closing {:plugin=>"LogStash::Outputs::ElasticSearch"}
[logstash.outputs.elasticsearch] Stopping sniffer
[logstash.outputs.elasticsearch] Stopping resurrectionist
[logstash.outputs.elasticsearch] Waiting for in use manticore connections
[logstash.outputs.elasticsearch] Closing adapter #LogStash::Outputs::ElasticSearch::HttpClient::ManticoreAdapter:0x29ab1495
[logstash.pipeline] Pipeline has terminated {:pipeline_id=>"main", :thread=>"#<Thread:0x18bd9b72 run>"}
[logstash.instrument.periodicpoller.os] Stopping
[logstash.instrument.periodicpoller.jvm] Stopping
[logstash.instrument.periodicpoller.persistentqueue] Stopping
[logstash.agent] Shutting down all pipelines {:pipelines_count=>0}
[logstash.agent] Converging pipelines state {:actions_count=>0}[INFO ][logstash.runner] Logstash shut down.

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.