First thank you for following up.
I am not expert in logstash and may be I didn't understand exactly what you are asking for. But I meant when I run the logstash, I don't get error in aggregation. As a matter of fact it says."Aggregate successful". Do I need to look somewhere else to see the exception?
Here is the result displays on the screen.
[logstash.agent] Successfully started Logstash API endpoint {:port=>1234}
[logstash.instrument.periodicpoller.jvm] collector name {:name=>"ParNew"}
[logstash.instrument.periodicpoller.jvm] collector name {:name=>"ConcurrentMarkSweep"}
[logstash.pipeline] filter received {"event"=>{"@version"=>"1", "@timestamp"=>2019-05-23T12:28:32.970Z, "tags"=>"in2", "control_num"=>2, "resume"=>"David blah, blah, blah"}}
[logstash.pipeline] filter received {"event"=>{"@timestamp"=>2019-05-23T12:28:32.985Z, "name"=>"Mike", "tags"=>"in1", "control_num"=>1, "@version"=>"1"}}
[logstash.pipeline] filter received {"event"=>{"@timestamp"=>2019-05-23T12:28:32.970Z, "name"=>"David", "tags"=>"in1", "control_num"=>2, "@version"=>"1"}}
[logstash.pipeline] filter received {"event"=>{"@version"=>"1", "@timestamp"=>2019-05-23T12:28:32.981Z, "tags"=>"in2", "control_num"=>1, "resume"=>"Mike blah, blah, blah"}}
[logstash.inputs.elasticsearch] Closing {:plugin=>"LogStash::Inputs::Elasticsearch"}
[logstash.filters.aggregate] Aggregate successful filter code execution {:code=>"\r\n map['tags'] = event.get('tags')\r\n if (event.get('tags') != 'in1')\r\n map['resume'] = event.get('resume')\r\n else\r\n map['name'] = event.get('name')\r\n end\r\n# event.cancel()\r\n "}
[logstash.filters.aggregate] Aggregate successful filter code execution {:code=>"\r\n map['tags'] = event.get('tags')\r\n if (event.get('tags') != 'in1')\r\n map['resume'] = event.get('resume')\r\n else\r\n map['name'] = event.get('name')\r\n end\r\n# event.cancel()\r\n "}
[logstash.filters.aggregate] Aggregate successful filter code execution {:code=>"\r\n map['tags'] = event.get('tags')\r\n if (event.get('tags') != 'in1')\r\n map['resume'] = event.get('resume')\r\n else\r\n map['name'] = event.get('name')\r\n end\r\n# event.cancel()\r\n "}
[logstash.filters.aggregate] Aggregate successful filter code execution {:code=>"\r\n map['tags'] = event.get('tags')\r\n if (event.get('tags') != 'in1')\r\n map['resume'] = event.get('resume')\r\n else\r\n map['name'] = event.get('name')\r\n end\r\n# event.cancel()\r\n "}
[logstash.pipeline] output received {"event"=>{"@timestamp"=>2019-05-23T12:28:32.970Z, "name"=>"David", "tags"=>["in1"], "control_num"=>2, "@version"=>"1"}}
[logstash.pipeline] output received {"event"=>{"@timestamp"=>2019-05-23T12:28:32.985Z, "name"=>"Mike", "tags"=>["in1"], "control_num"=>1, "@version"=>"1"}}
[logstash.pipeline] output received {"event"=>{"@version"=>"1", "@timestamp"=>2019-05-23T12:28:32.970Z, "tags"=>["in2"], "control_num"=>2, "resume"=>"David blah, blah, blah"}}
[logstash.pipeline] output received {"event"=>{"@version"=>"1", "@timestamp"=>2019-05-23T12:28:32.981Z, "tags"=>["in2"], "control_num"=>1, "resume"=>"Mike blah, blah, blah"}}
[logstash.pipeline] Pushing flush onto pipeline {:pipeline_id=>"main", :thread=>"#<Thread:0x18bd9b72 sleep>"}
[logstash.pipeline] Shutting down filter/output workers {:pipeline_id=>"main", :thread=>"#<Thread:0x18bd9b72 run>"}
[logstash.pipeline] Setting shutdown {:pipeline_id=>"main", :thread=>"#<Thread:0x18bd9b72 run>"}
[logstash.pipeline] Shutdown waiting for worker thread {:pipeline_id=>"main", :thread=>"#<Thread:0x3a1f5a9b run>"}
[logstash.pipeline] Flushing {:plugin=>#LogStash::FilterDelegator:0x54555fb2}
[logstash.filters.aggregate] Aggregate timeout for '%{control_num}' pattern: 1800 seconds
[logstash.pipeline] Flushing {:plugin=>#LogStash::FilterDelegator:0x54555fb2}
[logstash.filters.aggregate] Aggregate remove_expired_maps call with '%{control_num}' pattern and 2 maps
[logstash.filters.aggregate] Aggregate remove_expired_maps call with '%{control_num}' pattern and 2 maps
[logstash.filters.aggregate] Aggregate remove_expired_maps call with '%{control_num}' pattern and 2 maps
[logstash.filters.aggregate] Aggregate remove_expired_maps call with '%{control_num}' pattern and 2 maps
[logstash.pipeline] Flushing {:plugin=>#LogStash::FilterDelegator:0x54555fb2}
[logstash.filters.aggregate] Aggregate remove_expired_maps call with '%{control_num}' pattern and 2 maps
[logstash.filters.aggregate] Aggregate remove_expired_maps call with '%{control_num}' pattern and 2 maps
[logstash.filters.aggregate] Aggregate remove_expired_maps call with '%{control_num}' pattern and 2 maps
[logstash.filters.aggregate] Aggregate remove_expired_maps call with '%{control_num}' pattern and 2 maps
[logstash.pipeline] Shutdown waiting for worker thread {:pipeline_id=>"main", :thread=>"#<Thread:0x57e3be9 run>"}
[logstash.pipeline] Shutdown waiting for worker thread {:pipeline_id=>"main", :thread=>"#<Thread:0x1be9f9f5 dead>"}
[logstash.pipeline] Shutdown waiting for worker thread {:pipeline_id=>"main", :thread=>"#<Thread:0x4e5503 dead>"}
[logstash.pipeline] Shutdown waiting for worker thread {:pipeline_id=>"main", :thread=>"#<Thread:0x1acc2e42 dead>"}
[logstash.pipeline] Shutdown waiting for worker thread {:pipeline_id=>"main", :thread=>"#<Thread:0x40a31a40 dead>"}
[logstash.pipeline] Shutdown waiting for worker thread {:pipeline_id=>"main", :thread=>"#<Thread:0x7e56a45 dead>"}
[logstash.pipeline] Shutdown waiting for worker thread {:pipeline_id=>"main", :thread=>"#<Thread:0x50ce6a40 dead>"}
[logstash.filters.aggregate] Closing {:plugin=>"LogStash::Filters::Aggregate"}
[logstash.filters.aggregate] Aggregate close call {:code=>"\r\n map['tags'] = event.get('tags')\r\n if (event.get('tags') != 'in1')\r\n map['resume'] = event.get('resume')\r\n else\r\n map['name'] = event.get('name')\r\n end\r\n# event.cancel()\r\n "}
[logstash.outputs.elasticsearch] Closing {:plugin=>"LogStash::Outputs::ElasticSearch"}
[logstash.outputs.elasticsearch] Stopping sniffer
[logstash.outputs.elasticsearch] Stopping resurrectionist
[logstash.outputs.elasticsearch] Waiting for in use manticore connections
[logstash.outputs.elasticsearch] Closing adapter #LogStash::Outputs::ElasticSearch::HttpClient::ManticoreAdapter:0x29ab1495
[logstash.pipeline] Pipeline has terminated {:pipeline_id=>"main", :thread=>"#<Thread:0x18bd9b72 run>"}
[logstash.instrument.periodicpoller.os] Stopping
[logstash.instrument.periodicpoller.jvm] Stopping
[logstash.instrument.periodicpoller.persistentqueue] Stopping
[logstash.agent] Shutting down all pipelines {:pipelines_count=>0}
[logstash.agent] Converging pipelines state {:actions_count=>0}[INFO ][logstash.runner] Logstash shut down.